From 4fa46030ff7d114736b3ea4d057b2d456686da49 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 15 May 2024 07:18:20 -0700 Subject: [PATCH] Refactor the probabilistic sampler processor; add FailClosed configuration, prepare for OTEP 235 support (#31946) **Description:** Refactors the probabilistic sampling processor to prepare it for more OTEP 235 support. This clarifies existing inconsistencies between tracing and logging samplers, see the updated README. The tracing priority mechanism applies a 0% or 100% sampling override (e.g., "1" implies 100% sampling), whereas the logging sampling priority mechanism supports variable-probability override (e.g., "1" implies 1% sampling). This pins down cases where no randomness is available, and organizes the code to improve readability. A new type called `randomnessNamer` carries the randomness information (from the sampling pacakge) and a name of the policy that derived it. When sampling priority causes the effective sampling probability to change, the value "sampling.priority" replaces the source of randomness, which is currently limited to "trace_id_hash" or the name of the randomess-source attribute, for logs. While working on #31894, I discovered that some inputs fall through to the hash function with zero bytes of input randomness. The hash function, computed on an empty input (for logs) or on 16 bytes of zeros (which OTel calls an invalid trace ID), would produce a fixed random value. So, for example, when logs are sampled and there is no TraceID and there is no randomness attribute value, the result will be sampled at approximately 82.9% and above. In the refactored code, an error is returned when there is no input randomness. A new boolean configuration field determines the outcome when there is an error extracting randomness from an item of telemetry. By default, items of telemetry with errors will not pass through the sampler. When `FailClosed` is set to false, items of telemetry with errors will pass through the sampler. The original hash function, which uses 14 bits of information, is structured as an "acceptance threshold", ultimately the test for sampling translated into a positive decision when `Randomness < AcceptThreshold`. In the OTEP 235 scheme, thresholds are rejection thresholds--this PR modifies the original 14-bit accept threshold into a 56-bit reject threshold, using Threshold and Randomness types from the sampling package. Reframed in this way, in the subsequent PR (i.e., #31894) the effective sampling probability will be seamlessly conveyed using OTEP 235 semantic conventions. Note, both traces and logs processors are now reduced to a function like this: ``` return commonSamplingLogic( ctx, l, lsp.sampler, lsp.failClosed, lsp.sampler.randomnessFromLogRecord, lsp.priorityFunc, "logs sampler", lsp.logger, ) ``` which is a generic function that handles the common logic on a per-item basis and ends in a single metric event. This structure makes it clear how traces and logs are processed differently and have different prioritization schemes, currently. This structure also makes it easy to introduce new sampler modes, as shown in #31894. After this and #31940 merge, the changes in #31894 will be relatively simple to review as the third part in a series. **Link to tracking Issue:** Depends on #31940. Part of #31918. **Testing:** Added. Existing tests already cover the exact random behavior of the current hashing mechanism. Even more testing will be introduced with the last step of this series. Note that https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/32360 is added ahead of this test to ensure refactoring does not change results. **Documentation:** Added. --------- Co-authored-by: Kent Quirk --- .../probabilisticsampler_failclosed.yaml | 27 ++ cmd/configschema/go.mod | 3 + cmd/otelcontribcol/builder-config.yaml | 1 + cmd/otelcontribcol/go.mod | 3 + connector/datadogconnector/go.mod | 2 + exporter/datadogexporter/go.mod | 3 + .../datadogexporter/integrationtest/go.mod | 2 + go.mod | 3 + .../probabilisticsamplerprocessor/README.md | 190 ++++++++--- .../probabilisticsamplerprocessor/config.go | 10 + .../config_test.go | 24 +- .../probabilisticsamplerprocessor/factory.go | 5 + .../factory_test.go | 6 +- .../probabilisticsamplerprocessor/go.mod | 3 + .../logsprocessor.go | 170 +++++---- .../logsprocessor_test.go | 91 +++++ .../sampler_mode.go | 322 ++++++++++++++++++ .../sampler_mode_test.go | 41 +++ .../testdata/config.yaml | 11 +- .../{invalid.yaml => invalid_negative.yaml} | 7 +- .../tracesprocessor.go | 121 ++++--- .../tracesprocessor_test.go | 166 ++++++++- 22 files changed, 1021 insertions(+), 190 deletions(-) create mode 100644 .chloggen/probabilisticsampler_failclosed.yaml create mode 100644 processor/probabilisticsamplerprocessor/sampler_mode.go create mode 100644 processor/probabilisticsamplerprocessor/sampler_mode_test.go rename processor/probabilisticsamplerprocessor/testdata/{invalid.yaml => invalid_negative.yaml} (59%) diff --git a/.chloggen/probabilisticsampler_failclosed.yaml b/.chloggen/probabilisticsampler_failclosed.yaml new file mode 100644 index 000000000000..93304bc9fc5d --- /dev/null +++ b/.chloggen/probabilisticsampler_failclosed.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: probabilisticsamplerprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds the `FailClosed` flag to solidify current behavior when randomness source is missing. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31918] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 5c05121bc3e5..5abb6a6b440d 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -566,6 +566,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.100.0 // indirect @@ -1226,3 +1227,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/graf replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension => ../../extension/sumologicextension replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver => ../../receiver/splunkenterprisereceiver + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index 4d947b300d52..fa322d9b0a04 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -472,3 +472,4 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../../extension/opampcustommessages - github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider => ../../confmap/provider/s3provider - github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider + - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 91830c82ce43..fd4919af3935 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -631,6 +631,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.100.0 // indirect @@ -1293,3 +1294,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opam replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider => ../../confmap/provider/s3provider replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/connector/datadogconnector/go.mod b/connector/datadogconnector/go.mod index 5349319822ad..4a4e1c15a55c 100644 --- a/connector/datadogconnector/go.mod +++ b/connector/datadogconnector/go.mod @@ -332,3 +332,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/stor replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37 replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/exporter/datadogexporter/go.mod b/exporter/datadogexporter/go.mod index f6063492be9f..538e566e0c07 100644 --- a/exporter/datadogexporter/go.mod +++ b/exporter/datadogexporter/go.mod @@ -250,6 +250,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.100.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect @@ -427,3 +428,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/tail replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/exporter/datadogexporter/integrationtest/go.mod b/exporter/datadogexporter/integrationtest/go.mod index dbd6ee3cebf9..6170deb4efc4 100644 --- a/exporter/datadogexporter/integrationtest/go.mod +++ b/exporter/datadogexporter/integrationtest/go.mod @@ -341,3 +341,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/prob replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => ../../../receiver/prometheusreceiver replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../../processor/transformprocessor + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../../pkg/sampling diff --git a/go.mod b/go.mod index f17090b48f2a..dd8ca961ed4e 100644 --- a/go.mod +++ b/go.mod @@ -578,6 +578,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.100.0 // indirect @@ -1226,3 +1227,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/enco replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ./extension/ackextension replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver => ./receiver/splunkenterprisereceiver + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ./pkg/sampling diff --git a/processor/probabilisticsamplerprocessor/README.md b/processor/probabilisticsamplerprocessor/README.md index ae015aae08b1..758d6ebf518c 100644 --- a/processor/probabilisticsamplerprocessor/README.md +++ b/processor/probabilisticsamplerprocessor/README.md @@ -15,51 +15,159 @@ [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -The probabilistic sampler supports two types of sampling for traces: - -1. `sampling.priority` [semantic -convention](https://github.com/opentracing/specification/blob/master/semantic_conventions.md#span-tags-table) -as defined by OpenTracing -1. Trace ID hashing - -The `sampling.priority` semantic convention takes priority over trace ID hashing. As the name -implies, trace ID hashing samples based on hash values determined by trace IDs. See [Hashing](#hashing) for more information. +The probabilistic sampler processor supports several modes of sampling +for spans and log records. Sampling is performed on a per-request +basis, considering individual items statelessly. For whole trace +sampling, see +[tailsamplingprocessor](../tailsamplingprocessor/README.md). + +For trace spans, this sampler supports probabilistic sampling based on +a configured sampling percentage applied to the TraceID. In addition, +the sampler recognizes a `sampling.priority` annotation, which can +force the sampler to apply 0% or 100% sampling. + +For log records, this sampler can be configured to use the embedded +TraceID and follow the same logic as applied to spans. When the +TraceID is not defined, the sampler can be configured to apply hashing +to a selected log record attribute. This sampler also supports +sampling priority. + +## Consistency guarantee + +A consistent probability sampler is a Sampler that supports +independent sampling decisions for each span or log record in a group +(e.g. by TraceID), while maximizing the potential for completeness as +follows. + +Consistent probability sampling requires that for any span in a given +trace, if a Sampler with lesser sampling probability selects the span +for sampling, then the span would also be selected by a Sampler +configured with greater sampling probability. + +## Completeness property + +A trace is complete when all of its members are sampled. A +"sub-trace" is complete when all of its descendents are sampled. + +Ordinarily, Trace and Logging SDKs configure parent-based samplers +which decide to sample based on the Context, because it leads to +completeness. + +When non-root spans or logs make independent sampling decisions +instead of using the parent-based approach (e.g., using the +`TraceIDRatioBased` sampler for a non-root span), incompleteness may +result, and when spans and log records are independently sampled in a +processor, as by this component, the same potential for completeness +arises. The consistency guarantee helps minimimize this issue. + +Consistent probability samplers can be safely used with a mixture of +probabilities and preserve sub-trace completeness, provided that child +spans and log records are sampled with probability greater than or +equal to the parent context. + +Using 1%, 10% and 50% probabilities for example, in a consistent +probability scheme the 50% sampler must sample when the 10% sampler +does, and the 10% sampler must sample when the 1% sampler does. A +three-tier system could be configured with 1% sampling in the first +tier, 10% sampling in the second tier, and 50% sampling in the bottom +tier. In this configuration, 1% of traces will be complete, 10% of +traces will be sub-trace complete at the second tier, and 50% of +traces will be sub-trace complete at the third tier thanks to the +consistency property. + +These guidelines should be considered when deploying multiple +collectors with different sampling probabilities in a system. For +example, a collector serving frontend servers can be configured with +smaller sampling probability than a collector serving backend servers, +without breaking sub-trace completeness. + +## Sampling randomness + +To achieve consistency, sampling randomness is taken from a +deterministic aspect of the input data. For traces pipelines, the +source of randomness is always the TraceID. For logs pipelines, the +source of randomness can be the TraceID or another log record +attribute, if configured. + +For log records, the `attribute_source` and `from_attribute` fields determine the +source of randomness used for log records. When `attribute_source` is +set to `traceID`, the TraceID will be used. When `attribute_source` +is set to `record` or the TraceID field is absent, the value of +`from_attribute` is taken as the source of randomness (if configured). + +## Sampling priority + +The sampling priority mechanism is an override, which takes precedence +over the probabilistic decision in all modes. + +🛑 Compatibility note: Logs and Traces have different behavior. + +In traces pipelines, when the priority attribute has value 0, the +configured probability will by modified to 0% and the item will not +pass the sampler. When the priority attribute is non-zero the +configured probability will be set to 100%. The sampling priority +attribute is not configurable, and is called `sampling.priority`. + +In logs pipelines, when the priority attribute has value 0, the +configured probability will by modified to 0%, and the item will not +pass the sampler. Otherwise, the logs sampling priority attribute is +interpreted as a percentage, with values >= 100 equal to 100% +sampling. The logs sampling priority attribute is configured via +`sampling_priority`. + +## Sampling algorithm + +### Hash seed + +The hash seed method uses the FNV hash function applied to either a +Trace ID (spans, log records), or to the value of a specified +attribute (only logs). The hashed value, presumed to be random, is +compared against a threshold value that corresponds with the sampling +percentage. + +This mode requires configuring the `hash_seed` field. This mode is +enabled when the `hash_seed` field is not zero, or when log records +are sampled with `attribute_source` is set to `record`. + +In order for hashing to be consistent, all collectors for a given tier +(e.g. behind the same load balancer) must have the same +`hash_seed`. It is also possible to leverage a different `hash_seed` +at different collector tiers to support additional sampling +requirements. + +This mode uses 14 bits of sampling precision. + +### Error handling + +This processor considers it an error when the arriving data has no +randomess. This includes conditions where the TraceID field is +invalid (16 zero bytes) and where the log record attribute source has +zero bytes of information. + +By default, when there are errors determining sampling-related +information from an item of telemetry, the data will be refused. This +behavior can be changed by setting the `fail_closed` property to +false, in which case erroneous data will pass through the processor. + +## Configuration The following configuration options can be modified: -- `hash_seed` (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. -- `sampling_percentage` (default = 0): Percentage at which traces are sampled; >= 100 samples all traces -Examples: +- `sampling_percentage` (32-bit floating point, required): Percentage at which items are sampled; >= 100 samples all items, 0 rejects all items. +- `hash_seed` (32-bit unsigned integer, optional, default = 0): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. +- `fail_closed` (boolean, optional, default = true): Whether to reject items with sampling-related errors. -```yaml -processors: - probabilistic_sampler: - hash_seed: 22 - sampling_percentage: 15.3 -``` +### Logs-specific configuration -The probabilistic sampler supports sampling logs according to their trace ID, or by a specific log record attribute. - -The probabilistic sampler optionally may use a `hash_seed` to compute the hash of a log record. -This sampler samples based on hash values determined by log records. See [Hashing](#hashing) for more information. - -The following configuration options can be modified: -- `hash_seed` (no default, optional): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. -- `sampling_percentage` (required): Percentage at which logs are sampled; >= 100 samples all logs, 0 rejects all logs. -- `attribute_source` (default = traceID, optional): defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. -- `from_attribute` (default = null, optional): The optional name of a log record attribute used for sampling purposes, such as a unique log record ID. The value of the attribute is only used if the trace ID is absent or if `attribute_source` is set to `record`. -- `sampling_priority` (default = null, optional): The optional name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. 0 means to never sample the log record, and >= 100 means to always sample the log record. - -## Hashing - -In order for hashing to work, all collectors for a given tier (e.g. behind the same load balancer) -must have the same `hash_seed`. It is also possible to leverage a different `hash_seed` at -different collector tiers to support additional sampling requirements. Please refer to -[config.go](./config.go) for the config spec. +- `attribute_source` (string, optional, default = "traceID"): defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. +- `from_attribute` (string, optional, default = ""): The name of a log record attribute used for sampling purposes, such as a unique log record ID. The value of the attribute is only used if the trace ID is absent or if `attribute_source` is set to `record`. +- `sampling_priority` (string, optional, default = ""): The name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. 0 means to never sample the log record, and >= 100 means to always sample the log record. Examples: -Sample 15% of the logs: +Sample 15% of log records according to trace ID using the OpenTelemetry +specification. + ```yaml processors: probabilistic_sampler: @@ -76,7 +184,8 @@ processors: from_attribute: logID # value is required if the source is not traceID ``` -Sample logs according to the attribute `priority`: +Give sampling priority to log records according to the attribute named +`priority`: ```yaml processors: @@ -85,6 +194,7 @@ processors: sampling_priority: priority ``` +## Detailed examples -Refer to [config.yaml](./testdata/config.yaml) for detailed -examples on using the processor. +Refer to [config.yaml](./testdata/config.yaml) for detailed examples +on using the processor. diff --git a/processor/probabilisticsamplerprocessor/config.go b/processor/probabilisticsamplerprocessor/config.go index 3502fa8bc429..c4bc83eb6b11 100644 --- a/processor/probabilisticsamplerprocessor/config.go +++ b/processor/probabilisticsamplerprocessor/config.go @@ -35,6 +35,16 @@ type Config struct { // different sampling rates, configuring different seeds avoids that. HashSeed uint32 `mapstructure:"hash_seed"` + // FailClosed indicates to not sample data (the processor will + // fail "closed") in case of error, such as failure to parse + // the tracestate field or missing the randomness attribute. + // + // By default, failure cases are sampled (the processor is + // fails "open"). Sampling priority-based decisions are made after + // FailClosed is processed, making it possible to sample + // despite errors using priority. + FailClosed bool `mapstructure:"fail_closed"` + // AttributeSource (logs only) defines where to look for the attribute in from_attribute. The allowed values are // `traceID` or `record`. Default is `traceID`. AttributeSource `mapstructure:"attribute_source"` diff --git a/processor/probabilisticsamplerprocessor/config_test.go b/processor/probabilisticsamplerprocessor/config_test.go index 90711d343552..d236c519c53d 100644 --- a/processor/probabilisticsamplerprocessor/config_test.go +++ b/processor/probabilisticsamplerprocessor/config_test.go @@ -26,8 +26,8 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ SamplingPercentage: 15.3, - HashSeed: 22, AttributeSource: "traceID", + FailClosed: true, }, }, { @@ -38,6 +38,7 @@ func TestLoadConfig(t *testing.T) { AttributeSource: "record", FromAttribute: "foo", SamplingPriority: "bar", + FailClosed: true, }, }, } @@ -63,12 +64,21 @@ func TestLoadConfig(t *testing.T) { } func TestLoadInvalidConfig(t *testing.T) { - factories, err := otelcoltest.NopFactories() - require.NoError(t, err) + for _, test := range []struct { + file string + contains string + }{ + {"invalid_negative.yaml", "negative sampling rate"}, + } { + t.Run(test.file, func(t *testing.T) { + factories, err := otelcoltest.NopFactories() + require.NoError(t, err) - factory := NewFactory() - factories.Processors[metadata.Type] = factory + factory := NewFactory() + factories.Processors[metadata.Type] = factory - _, err = otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "invalid.yaml"), factories) - require.ErrorContains(t, err, "negative sampling rate: -15.30") + _, err = otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", test.file), factories) + require.ErrorContains(t, err, test.contains) + }) + } } diff --git a/processor/probabilisticsamplerprocessor/factory.go b/processor/probabilisticsamplerprocessor/factory.go index 8302e7471840..481f37a600e0 100644 --- a/processor/probabilisticsamplerprocessor/factory.go +++ b/processor/probabilisticsamplerprocessor/factory.go @@ -20,6 +20,10 @@ import ( var onceMetrics sync.Once +// The default precision is 4 hex digits, slightly more the original +// component logic's 14-bits of precision. +const defaultPrecision = 4 + // NewFactory returns a new factory for the Probabilistic sampler processor. func NewFactory() processor.Factory { onceMetrics.Do(func() { @@ -37,6 +41,7 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ AttributeSource: defaultAttributeSource, + FailClosed: true, } } diff --git a/processor/probabilisticsamplerprocessor/factory_test.go b/processor/probabilisticsamplerprocessor/factory_test.go index cd4a6246c649..8818f49eb72d 100644 --- a/processor/probabilisticsamplerprocessor/factory_test.go +++ b/processor/probabilisticsamplerprocessor/factory_test.go @@ -15,22 +15,22 @@ import ( func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig() - assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + assert.NotNil(t, cfg, "failed to create default config") } func TestCreateProcessor(t *testing.T) { cfg := createDefaultConfig() set := processortest.NewNopCreateSettings() tp, err := createTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) - assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") + assert.NotNil(t, tp) } func TestCreateProcessorLogs(t *testing.T) { cfg := createDefaultConfig() set := processortest.NewNopCreateSettings() tp, err := createLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) - assert.NotNil(t, tp) assert.NoError(t, err, "cannot create logs processor") + assert.NotNil(t, tp) } diff --git a/processor/probabilisticsamplerprocessor/go.mod b/processor/probabilisticsamplerprocessor/go.mod index ca2f32172fbe..e05351b90de2 100644 --- a/processor/probabilisticsamplerprocessor/go.mod +++ b/processor/probabilisticsamplerprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.100.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 github.com/stretchr/testify v1.9.0 go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.100.1-0.20240509190532-c555005fcc80 @@ -112,3 +113,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index d5f2ef3a75f8..d10ad88fa586 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -5,38 +5,79 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" - "strconv" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) -type logSamplerProcessor struct { - scaledSamplingRate uint32 - hashSeed uint32 - traceIDEnabled bool - samplingSource string - samplingPriority string - logger *zap.Logger +type logsProcessor struct { + sampler dataSampler + + samplingPriority string + failClosed bool + logger *zap.Logger +} + +type recordCarrier struct { + record plog.LogRecord +} + +var _ samplingCarrier = &recordCarrier{} + +func newLogRecordCarrier(l plog.LogRecord) samplingCarrier { + return &recordCarrier{ + record: l, + } +} + +func (*neverSampler) randomnessFromLogRecord(_ plog.LogRecord) (randomnessNamer, samplingCarrier, error) { + // We return a fake randomness value, since it will not be used. + // This avoids a consistency check error for missing randomness. + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil +} + +// randomnessFromLogRecord (hashingSampler) uses a hash function over +// the TraceID +func (th *hashingSampler) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { + rnd := newMissingRandomnessMethod() + lrc := newLogRecordCarrier(logRec) + + if th.logsTraceIDEnabled { + value := logRec.TraceID() + if !value.IsEmpty() { + rnd = newTraceIDHashingMethod(randomnessFromBytes(value[:], th.hashSeed)) + } + } + + if isMissing(rnd) && th.logsRandomnessSourceAttribute != "" { + if value, ok := logRec.Attributes().Get(th.logsRandomnessSourceAttribute); ok { + by := getBytesFromValue(value) + if len(by) > 0 { + rnd = newAttributeHashingMethod( + th.logsRandomnessSourceAttribute, + randomnessFromBytes(by, th.hashSeed), + ) + } + } + } + + return rnd, lrc, nil } // newLogsProcessor returns a processor.LogsProcessor that will perform head sampling according to the given // configuration. func newLogsProcessor(ctx context.Context, set processor.CreateSettings, nextConsumer consumer.Logs, cfg *Config) (processor.Logs, error) { - - lsp := &logSamplerProcessor{ - scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), - hashSeed: cfg.HashSeed, - traceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, - samplingPriority: cfg.SamplingPriority, - samplingSource: cfg.FromAttribute, - logger: set.Logger, + lsp := &logsProcessor{ + sampler: makeSampler(cfg), + samplingPriority: cfg.SamplingPriority, + failClosed: cfg.FailClosed, + logger: set.Logger, } return processorhelper.NewLogsProcessor( @@ -48,48 +89,20 @@ func newLogsProcessor(ctx context.Context, set processor.CreateSettings, nextCon processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) } -func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { - ld.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool { +func (lsp *logsProcessor) processLogs(ctx context.Context, logsData plog.Logs) (plog.Logs, error) { + logsData.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool { rl.ScopeLogs().RemoveIf(func(ill plog.ScopeLogs) bool { ill.LogRecords().RemoveIf(func(l plog.LogRecord) bool { - - tagPolicyValue := "always_sampling" - // pick the sampling source. - var lidBytes []byte - if lsp.traceIDEnabled && !l.TraceID().IsEmpty() { - value := l.TraceID() - tagPolicyValue = "trace_id_hash" - lidBytes = value[:] - } - if lidBytes == nil && lsp.samplingSource != "" { - if value, ok := l.Attributes().Get(lsp.samplingSource); ok { - tagPolicyValue = lsp.samplingSource - lidBytes = getBytesFromValue(value) - } - } - priority := lsp.scaledSamplingRate - if lsp.samplingPriority != "" { - if localPriority, ok := l.Attributes().Get(lsp.samplingPriority); ok { - switch localPriority.Type() { - case pcommon.ValueTypeDouble: - priority = uint32(localPriority.Double() * percentageScaleFactor) - case pcommon.ValueTypeInt: - priority = uint32(float64(localPriority.Int()) * percentageScaleFactor) - } - } - } - - sampled := computeHash(lidBytes, lsp.hashSeed)&bitMaskHashBuckets < priority - var err error = stats.RecordWithTags( + return !commonShouldSampleLogic( ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, tagPolicyValue), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, - statCountLogsSampled.M(int64(1)), + l, + lsp.sampler, + lsp.failClosed, + lsp.sampler.randomnessFromLogRecord, + lsp.priorityFunc, + "logs sampler", + lsp.logger, ) - if err != nil { - lsp.logger.Error(err.Error()) - } - - return !sampled }) // Filter out empty ScopeLogs return ill.LogRecords().Len() == 0 @@ -97,15 +110,46 @@ func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) ( // Filter out empty ResourceLogs return rl.ScopeLogs().Len() == 0 }) - if ld.ResourceLogs().Len() == 0 { - return ld, processorhelper.ErrSkipProcessingData + if logsData.ResourceLogs().Len() == 0 { + return logsData, processorhelper.ErrSkipProcessingData + } + return logsData, nil +} + +func (lsp *logsProcessor) priorityFunc(logRec plog.LogRecord, rnd randomnessNamer, threshold sampling.Threshold) (randomnessNamer, sampling.Threshold) { + // Note: in logs, unlike traces, the sampling priority + // attribute is interpreted as a request to be sampled. + if lsp.samplingPriority != "" { + priorityThreshold := lsp.logRecordToPriorityThreshold(logRec) + + if priorityThreshold == sampling.NeverSampleThreshold { + threshold = priorityThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + } else if sampling.ThresholdLessThan(priorityThreshold, threshold) { + threshold = priorityThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + } } - return ld, nil + return rnd, threshold } -func getBytesFromValue(value pcommon.Value) []byte { - if value.Type() == pcommon.ValueTypeBytes { - return value.Bytes().AsRaw() +func (lsp *logsProcessor) logRecordToPriorityThreshold(logRec plog.LogRecord) sampling.Threshold { + if localPriority, ok := logRec.Attributes().Get(lsp.samplingPriority); ok { + // Potentially raise the sampling probability to minProb + minProb := 0.0 + switch localPriority.Type() { + case pcommon.ValueTypeDouble: + minProb = localPriority.Double() / 100.0 + case pcommon.ValueTypeInt: + minProb = float64(localPriority.Int()) / 100.0 + } + if minProb != 0 { + if th, err := sampling.ProbabilityToThresholdWithPrecision(minProb, defaultPrecision); err == nil { + // The record has supplied a valid alternative sampling probability + return th + } + + } } - return []byte(value.AsString()) + return sampling.NeverSampleThreshold } diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index 2e084ac38809..e80ab3c344fd 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -5,6 +5,7 @@ package probabilisticsamplerprocessor import ( "context" + "fmt" "testing" "time" @@ -15,6 +16,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor/processortest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) func TestNewLogsProcessor(t *testing.T) { @@ -55,6 +58,10 @@ func TestNewLogsProcessor(t *testing.T) { } func TestLogsSampling(t *testing.T) { + // Note: in the tests below, &Config{} objects are created w/o + // use of factory-supplied defaults. Therefore, we explicitly + // set FailClosed: true in cases where the legacy test + // included coverage of data with missing randomness. tests := []struct { name string cfg *Config @@ -79,7 +86,12 @@ func TestLogsSampling(t *testing.T) { cfg: &Config{ SamplingPercentage: 50, AttributeSource: traceIDAttributeSource, + FailClosed: true, }, + // Note: This count excludes one empty TraceID + // that fails closed. If this test had been + // written for 63% or greater, it would have been + // counted. received: 45, }, { @@ -89,6 +101,7 @@ func TestLogsSampling(t *testing.T) { AttributeSource: recordAttributeSource, FromAttribute: "foo", }, + received: 0, }, { @@ -106,6 +119,7 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "foo", + FailClosed: true, }, received: 23, }, @@ -115,6 +129,7 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "bar", + FailClosed: true, }, received: 29, // probabilistic... doesn't yield the same results as foo }, @@ -149,6 +164,8 @@ func TestLogsSampling(t *testing.T) { record.SetTimestamp(pcommon.Timestamp(time.Unix(1649400860, 0).Unix())) record.SetSeverityNumber(plog.SeverityNumberDebug) ib := byte(i) + // Note this TraceID is invalid when i==0. Since this test + // encodes historical behavior, we leave it as-is. traceID := [16]byte{0, 0, 0, 0, 0, 0, 0, 0, ib, ib, ib, ib, ib, ib, ib, ib} record.SetTraceID(traceID) // set half of records with a foo (bytes) and a bar (string) attribute @@ -173,3 +190,77 @@ func TestLogsSampling(t *testing.T) { }) } } + +func TestLogsMissingRandomness(t *testing.T) { + type test struct { + pct float32 + source AttributeSource + failClosed bool + sampled bool + } + + for _, tt := range []test{ + {0, recordAttributeSource, true, false}, + {50, recordAttributeSource, true, false}, + {100, recordAttributeSource, true, false}, + + {0, recordAttributeSource, false, false}, + {50, recordAttributeSource, false, true}, + {100, recordAttributeSource, false, true}, + + {0, traceIDAttributeSource, true, false}, + {50, traceIDAttributeSource, true, false}, + {100, traceIDAttributeSource, true, false}, + + {0, traceIDAttributeSource, false, false}, + {50, traceIDAttributeSource, false, true}, + {100, traceIDAttributeSource, false, true}, + } { + t.Run(fmt.Sprint(tt.pct, "_", tt.source, "_", tt.failClosed), func(t *testing.T) { + + ctx := context.Background() + logs := plog.NewLogs() + record := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + record.SetTraceID(pcommon.TraceID{}) // invalid TraceID + + cfg := &Config{ + SamplingPercentage: tt.pct, + HashSeed: defaultHashSeed, + FailClosed: tt.failClosed, + AttributeSource: tt.source, + FromAttribute: "unused", + } + + sink := new(consumertest.LogsSink) + set := processortest.NewNopCreateSettings() + // Note: there is a debug-level log we are expecting when FailClosed + // causes a drop. + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + lp, err := newLogsProcessor(ctx, set, sink, cfg) + require.NoError(t, err) + + err = lp.ConsumeLogs(ctx, logs) + require.NoError(t, err) + + sampledData := sink.AllLogs() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.LogRecordCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.LogRecordCount()) + } + + if tt.pct != 0 { + // pct==0 bypasses the randomness check + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "logs sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") + } else { + require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + } + }) + } +} diff --git a/processor/probabilisticsamplerprocessor/sampler_mode.go b/processor/probabilisticsamplerprocessor/sampler_mode.go new file mode 100644 index 000000000000..6bf09caa271f --- /dev/null +++ b/processor/probabilisticsamplerprocessor/sampler_mode.go @@ -0,0 +1,322 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor" + +import ( + "context" + "errors" + "fmt" + "strconv" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" +) + +const ( + // Hashing method: The constants below help translate user friendly percentages + // to numbers direct used in sampling. + numHashBucketsLg2 = 14 + numHashBuckets = 0x4000 // Using a power of 2 to avoid division. + bitMaskHashBuckets = numHashBuckets - 1 + percentageScaleFactor = numHashBuckets / 100.0 +) + +// SamplerMode controls the logic used in making a sampling decision. +// The HashSeed mode is the only mode, presently, and it is also the +// default mode. +// +// TODO: In the future, when OTEP 235 is introduced, there will be two +// new modes. +type SamplerMode string + +const ( + HashSeed SamplerMode = "hash_seed" + DefaultMode SamplerMode = HashSeed + modeUnset SamplerMode = "" +) + +// ErrMissingRandomness indicates no randomness source was found. +var ErrMissingRandomness = errors.New("missing randomness") + +type randomnessNamer interface { + randomness() sampling.Randomness + policyName() string +} + +type randomnessMethod sampling.Randomness + +func (rm randomnessMethod) randomness() sampling.Randomness { + return sampling.Randomness(rm) +} + +type traceIDHashingMethod struct{ randomnessMethod } +type samplingPriorityMethod struct{ randomnessMethod } + +type missingRandomnessMethod struct{} + +func (rm missingRandomnessMethod) randomness() sampling.Randomness { + return sampling.AllProbabilitiesRandomness +} + +func (missingRandomnessMethod) policyName() string { + return "missing_randomness" +} + +type attributeHashingMethod struct { + randomnessMethod + attribute string +} + +func (am attributeHashingMethod) policyName() string { + return am.attribute +} + +func (traceIDHashingMethod) policyName() string { + return "trace_id_hash" +} + +func (samplingPriorityMethod) policyName() string { + return "sampling_priority" +} + +var _ randomnessNamer = missingRandomnessMethod{} +var _ randomnessNamer = traceIDHashingMethod{} +var _ randomnessNamer = samplingPriorityMethod{} + +func newMissingRandomnessMethod() randomnessNamer { + return missingRandomnessMethod{} +} + +func isMissing(rnd randomnessNamer) bool { + _, ok := rnd.(missingRandomnessMethod) + return ok +} + +func newTraceIDHashingMethod(rnd sampling.Randomness) randomnessNamer { + return traceIDHashingMethod{randomnessMethod(rnd)} +} + +func newSamplingPriorityMethod(rnd sampling.Randomness) randomnessNamer { + return samplingPriorityMethod{randomnessMethod(rnd)} +} + +func newAttributeHashingMethod(attribute string, rnd sampling.Randomness) randomnessNamer { + return attributeHashingMethod{ + randomnessMethod: randomnessMethod(rnd), + attribute: attribute, + } +} + +// TODO: Placeholder interface, see #31894 for its future contents, +// will become a non-empty interface. (Linter forces us to write "any".) +type samplingCarrier any + +type dataSampler interface { + // decide reports the result based on a probabilistic decision. + decide(carrier samplingCarrier) sampling.Threshold + + // randomnessFromSpan extracts randomness and returns a carrier specific to traces data. + randomnessFromSpan(s ptrace.Span) (randomness randomnessNamer, carrier samplingCarrier, err error) + + // randomnessFromLogRecord extracts randomness and returns a carrier specific to logs data. + randomnessFromLogRecord(s plog.LogRecord) (randomness randomnessNamer, carrier samplingCarrier, err error) +} + +var AllModes = []SamplerMode{HashSeed} + +func (sm *SamplerMode) UnmarshalText(in []byte) error { + switch mode := SamplerMode(in); mode { + case HashSeed, + modeUnset: + *sm = mode + return nil + default: + return fmt.Errorf("unsupported sampler mode %q", mode) + } +} + +// hashingSampler is the original hash-based calculation. It is an +// equalizing sampler with randomness calculation that matches the +// original implementation. This hash-based implementation is limited +// to 14 bits of precision. +type hashingSampler struct { + hashSeed uint32 + tvalueThreshold sampling.Threshold + + // Logs only: name of attribute to obtain randomness + logsRandomnessSourceAttribute string + + // Logs only: name of attribute to obtain randomness + logsTraceIDEnabled bool +} + +func (th *hashingSampler) decide(_ samplingCarrier) sampling.Threshold { + return th.tvalueThreshold +} + +// neverSampler always decides false. +type neverSampler struct { +} + +func (*neverSampler) decide(_ samplingCarrier) sampling.Threshold { + return sampling.NeverSampleThreshold +} + +func getBytesFromValue(value pcommon.Value) []byte { + if value.Type() == pcommon.ValueTypeBytes { + return value.Bytes().AsRaw() + } + return []byte(value.AsString()) +} + +func randomnessFromBytes(b []byte, hashSeed uint32) sampling.Randomness { + hashed32 := computeHash(b, hashSeed) + hashed := uint64(hashed32 & bitMaskHashBuckets) + + // Ordinarily, hashed is compared against an acceptance + // threshold i.e., sampled when hashed < scaledSamplerate, + // which has the form R < T with T in [1, 2^14] and + // R in [0, 2^14-1]. + // + // Here, modify R to R' and T to T', so that the sampling + // equation has identical form to the specification, i.e., T' + // <= R', using: + // + // T' = numHashBuckets-T + // R' = numHashBuckets-1-R + // + // As a result, R' has the correct most-significant 14 bits to + // use in an R-value. + rprime14 := numHashBuckets - 1 - hashed + + // There are 18 unused bits from the FNV hash function. + unused18 := uint64(hashed32 >> (32 - numHashBucketsLg2)) + mixed28 := unused18 ^ (unused18 << 10) + + // The 56 bit quantity here consists of, most- to least-significant: + // - 14 bits: R' = numHashBuckets - 1 - hashed + // - 28 bits: mixture of unused 18 bits + // - 14 bits: original `hashed`. + rnd56 := (rprime14 << 42) | (mixed28 << 14) | hashed + + // Note: by construction: + // - OTel samplers make the same probabilistic decision with this r-value, + // - only 14 out of 56 bits are used in the sampling decision, + // - there are only 32 actual random bits. + rnd, _ := sampling.UnsignedToRandomness(rnd56) + return rnd +} + +// consistencyCheck checks for certain inconsistent inputs. +// +// if the randomness is missing, returns ErrMissingRandomness. +func consistencyCheck(rnd randomnessNamer, _ samplingCarrier) error { + if isMissing(rnd) { + return ErrMissingRandomness + } + return nil +} + +// makeSample constructs a sampler. There are no errors, as the only +// potential error, out-of-range probability, is corrected automatically +// according to the README, which allows percents >100 to equal 100%. +// +// Extending this logic, we round very small probabilities up to the +// minimum supported value(s) which varies according to sampler mode. +func makeSampler(cfg *Config) dataSampler { + // README allows percents >100 to equal 100%. + pct := cfg.SamplingPercentage + if pct > 100 { + pct = 100 + } + + never := &neverSampler{} + + if pct == 0 { + return never + } + + // Note: the original hash function used in this code + // is preserved to ensure consistency across updates. + // + // uint32(pct * percentageScaleFactor) + // + // (a) carried out the multiplication in 32-bit precision + // (b) rounded to zero instead of nearest. + scaledSampleRate := uint32(pct * percentageScaleFactor) + + if scaledSampleRate == 0 { + return never + } + + // Convert the accept threshold to a reject threshold, + // then shift it into 56-bit value. + reject := numHashBuckets - scaledSampleRate + reject56 := uint64(reject) << 42 + + threshold, _ := sampling.UnsignedToThreshold(reject56) + + return &hashingSampler{ + tvalueThreshold: threshold, + hashSeed: cfg.HashSeed, + + // Logs specific: + logsTraceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, + logsRandomnessSourceAttribute: cfg.FromAttribute, + } +} + +// randFunc returns randomness (w/ named policy), a carrier, and the error. +type randFunc[T any] func(T) (randomnessNamer, samplingCarrier, error) + +// priorityFunc makes changes resulting from sampling priority. +type priorityFunc[T any] func(T, randomnessNamer, sampling.Threshold) (randomnessNamer, sampling.Threshold) + +// commonSamplingLogic implements sampling on a per-item basis +// independent of the signal type, as embodied in the functional +// parameters: +func commonShouldSampleLogic[T any]( + ctx context.Context, + item T, + sampler dataSampler, + failClosed bool, + randFunc randFunc[T], + priorityFunc priorityFunc[T], + description string, + logger *zap.Logger, +) bool { + rnd, carrier, err := randFunc(item) + if err == nil { + err = consistencyCheck(rnd, carrier) + } + var threshold sampling.Threshold + if err != nil { + logger.Debug(description, zap.Error(err)) + if failClosed { + threshold = sampling.NeverSampleThreshold + } else { + threshold = sampling.AlwaysSampleThreshold + } + } else { + threshold = sampler.decide(carrier) + } + + rnd, threshold = priorityFunc(item, rnd, threshold) + + sampled := threshold.ShouldSample(rnd.randomness()) + + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(tagPolicyKey, rnd.policyName()), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, + statCountTracesSampled.M(int64(1)), + ) + + return sampled +} diff --git a/processor/probabilisticsamplerprocessor/sampler_mode_test.go b/processor/probabilisticsamplerprocessor/sampler_mode_test.go new file mode 100644 index 000000000000..170da3ed6d44 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/sampler_mode_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package probabilisticsamplerprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnmarshalText(t *testing.T) { + tests := []struct { + samplerMode string + shouldError bool + }{ + { + samplerMode: "hash_seed", + }, + { + samplerMode: "", + }, + { + samplerMode: "dunno", + shouldError: true, + }, + } + for _, tt := range tests { + t.Run(tt.samplerMode, func(t *testing.T) { + temp := modeUnset + err := temp.UnmarshalText([]byte(tt.samplerMode)) + if tt.shouldError { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, temp, SamplerMode(tt.samplerMode)) + }) + } +} diff --git a/processor/probabilisticsamplerprocessor/testdata/config.yaml b/processor/probabilisticsamplerprocessor/testdata/config.yaml index a834def5d98c..0e853f77cbe3 100644 --- a/processor/probabilisticsamplerprocessor/testdata/config.yaml +++ b/processor/probabilisticsamplerprocessor/testdata/config.yaml @@ -11,15 +11,6 @@ processors: # zero, i.e.: no sample. Values greater or equal 100 are treated as # "sample all traces". sampling_percentage: 15.3 - # hash_seed allows one to configure the hashing seed. This is important in - # scenarios where multiple layers of collectors are used to achieve the - # desired sampling rate, eg.: 10% on first layer and 10% on the - # second, resulting in an overall sampling rate of 1% (10% x 10%). - # If all layers use the same seed, all data passing one layer will also pass - # the next one, independent of the configured sampling rate. Having different - # seeds at different layers ensures that sampling rate in each layer work as - # intended. - hash_seed: 22 probabilistic_sampler/logs: # the percentage rate at which logs are going to be sampled. Defaults to @@ -34,6 +25,8 @@ processors: # the next one, independent of the configured sampling rate. Having different # seeds at different layers ensures that sampling rate in each layer work as # intended. + # + # setting the hash_seed != 0 causes hash_seed to be selected by default hash_seed: 22 # attribute_source defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. attribute_source: "record" diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_negative.yaml similarity index 59% rename from processor/probabilisticsamplerprocessor/testdata/invalid.yaml rename to processor/probabilisticsamplerprocessor/testdata/invalid_negative.yaml index ffd9b1e07d16..13f36d8da540 100644 --- a/processor/probabilisticsamplerprocessor/testdata/invalid.yaml +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_negative.yaml @@ -3,16 +3,15 @@ receivers: processors: - probabilistic_sampler/logs: + probabilistic_sampler/traces: sampling_percentage: -15.3 - hash_seed: 22 exporters: nop: service: pipelines: - logs: + traces: receivers: [ nop ] - processors: [ probabilistic_sampler/logs ] + processors: [ probabilistic_sampler/traces ] exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor.go b/processor/probabilisticsamplerprocessor/tracesprocessor.go index 96224e6f5704..cafa25589d20 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor.go @@ -7,14 +7,14 @@ import ( "context" "strconv" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) // samplingPriority has the semantic result of parsing the "sampling.priority" @@ -34,74 +34,76 @@ const ( // equal zero and it is NOT going to be sampled, ie.: it won't be forwarded // by the collector. doNotSampleSpan - - // The constants help translate user friendly percentages to numbers direct used in sampling. - numHashBuckets = 0x4000 // Using a power of 2 to avoid division. - bitMaskHashBuckets = numHashBuckets - 1 - percentageScaleFactor = numHashBuckets / 100.0 ) -type traceSamplerProcessor struct { - scaledSamplingRate uint32 - hashSeed uint32 - logger *zap.Logger +type traceProcessor struct { + sampler dataSampler + failClosed bool + logger *zap.Logger +} + +// tracestateCarrier conveys information about sampled spans between +// the call to parse incoming randomness/threshold and the call to +// decide. +type tracestateCarrier struct { + span ptrace.Span +} + +var _ samplingCarrier = &tracestateCarrier{} + +func newTracestateCarrier(s ptrace.Span) samplingCarrier { + return &tracestateCarrier{ + span: s, + } } -// newTracesProcessor returns a processor.TracesProcessor that will perform head sampling according to the given +// newTracesProcessor returns a processor.TracesProcessor that will +// perform intermediate span sampling according to the given // configuration. func newTracesProcessor(ctx context.Context, set processor.CreateSettings, cfg *Config, nextConsumer consumer.Traces) (processor.Traces, error) { - tsp := &traceSamplerProcessor{ - // Adjust sampling percentage on private so recalculations are avoided. - scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), - hashSeed: cfg.HashSeed, - logger: set.Logger, + tp := &traceProcessor{ + sampler: makeSampler(cfg), + failClosed: cfg.FailClosed, + logger: set.Logger, } - return processorhelper.NewTracesProcessor( ctx, set, cfg, nextConsumer, - tsp.processTraces, + tp.processTraces, processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) } -func (tsp *traceSamplerProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { +func (th *neverSampler) randomnessFromSpan(_ ptrace.Span) (randomnessNamer, samplingCarrier, error) { + // We return a fake randomness value, since it will not be used. + // This avoids a consistency check error for missing randomness. + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil +} + +func (th *hashingSampler) randomnessFromSpan(s ptrace.Span) (randomnessNamer, samplingCarrier, error) { + tid := s.TraceID() + tsc := newTracestateCarrier(s) + rnd := newMissingRandomnessMethod() + if !tid.IsEmpty() { + rnd = newTraceIDHashingMethod(randomnessFromBytes(tid[:], th.hashSeed)) + } + return rnd, tsc, nil +} +func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool { rs.ScopeSpans().RemoveIf(func(ils ptrace.ScopeSpans) bool { ils.Spans().RemoveIf(func(s ptrace.Span) bool { - sp := parseSpanSamplingPriority(s) - if sp == doNotSampleSpan { - // The OpenTelemetry mentions this as a "hint" we take a stronger - // approach and do not sample the span since some may use it to - // remove specific spans from traces. - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "sampling_priority"), tag.Upsert(tagSampledKey, "false")}, - statCountTracesSampled.M(int64(1)), - ) - return true - } - - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "sampling_priority"), tag.Upsert(tagSampledKey, "true")}, - statCountTracesSampled.M(int64(1)), - ) - - // If one assumes random trace ids hashing may seems avoidable, however, traces can be coming from sources - // with various different criteria to generate trace id and perhaps were already sampled without hashing. - // Hashing here prevents bias due to such systems. - tidBytes := s.TraceID() - sampled := sp == mustSampleSpan || - computeHash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < tsp.scaledSamplingRate - - _ = stats.RecordWithTags( + return !commonShouldSampleLogic( ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "trace_id_hash"), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, - statCountTracesSampled.M(int64(1)), + s, + tp.sampler, + tp.failClosed, + tp.sampler.randomnessFromSpan, + tp.priorityFunc, + "traces sampler", + tp.logger, ) - return !sampled }) // Filter out empty ScopeMetrics return ils.Spans().Len() == 0 @@ -115,6 +117,25 @@ func (tsp *traceSamplerProcessor) processTraces(ctx context.Context, td ptrace.T return td, nil } +func (tp *traceProcessor) priorityFunc(s ptrace.Span, rnd randomnessNamer, threshold sampling.Threshold) (randomnessNamer, sampling.Threshold) { + switch parseSpanSamplingPriority(s) { + case doNotSampleSpan: + // OpenTracing mentions this as a "hint". We take a stronger + // approach and do not sample the span since some may use it to + // remove specific spans from traces. + threshold = sampling.NeverSampleThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + case mustSampleSpan: + threshold = sampling.AlwaysSampleThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + case deferDecision: + // Note that the logs processor has very different logic here, + // but that in tracing the priority can only force to never or + // always. + } + return rnd, threshold +} + // parseSpanSamplingPriority checks if the span has the "sampling.priority" tag to // decide if the span should be sampled or not. The usage of the tag follows the // OpenTracing semantic tags: diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go index 2799f55360b7..4166e347e477 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go @@ -6,6 +6,7 @@ package probabilisticsamplerprocessor import ( "context" "encoding/hex" + "fmt" "math" "math/rand" "testing" @@ -18,10 +19,35 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor/processortest" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils" ) +// defaultHashSeed is used throughout to ensure that the HashSeed is real +// and does not fall back to proportional-mode sampling due to HashSeed == 0. +const defaultHashSeed = 4312 + +func TestHashBucketsLog2(t *testing.T) { + require.Equal(t, numHashBuckets, 1< tt.acceptableDelta { @@ -176,6 +206,8 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t const testSvcName = "test-svc" for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + tt.cfg.HashSeed = defaultHashSeed + sink := new(consumertest.TracesSink) tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), tt.cfg, sink) if err != nil { @@ -193,6 +225,74 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t } } +func Test_tracessamplerprocessor_MissingRandomness(t *testing.T) { + type test struct { + pct float32 + failClosed bool + sampled bool + } + + for _, tt := range []test{ + // When the TraceID is empty and failClosed==true, the span is not sampled. + {0, true, false}, + {62, true, false}, + {100, true, false}, + + // When the TraceID is empty and failClosed==false, the span is sampled when pct != 0. + {0, false, false}, + {62, false, true}, + {100, false, true}, + } { + t.Run(fmt.Sprint(tt.pct, "_", tt.failClosed), func(t *testing.T) { + + ctx := context.Background() + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(pcommon.TraceID{}) // invalid TraceID + span.SetSpanID(pcommon.SpanID{1, 2, 3, 4, 5, 6, 7, 8}) // valid SpanID + span.SetName("testing") + + cfg := &Config{ + SamplingPercentage: tt.pct, + HashSeed: defaultHashSeed, + FailClosed: tt.failClosed, + } + + sink := new(consumertest.TracesSink) + + set := processortest.NewNopCreateSettings() + // Note: there is a debug-level log we are expecting when FailClosed + // causes a drop. + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + tsp, err := newTracesProcessor(ctx, set, cfg, sink) + require.NoError(t, err) + + err = tsp.ConsumeTraces(ctx, traces) + require.NoError(t, err) + + sampledData := sink.AllTraces() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + } + + if tt.pct != 0 { + // pct==0 bypasses the randomness check + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "traces sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") + } else { + require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + } + }) + } +} + // Test_tracesamplerprocessor_SpanSamplingPriority checks if handling of "sampling.priority" is correct. func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { singleSpanWithAttrib := func(key string, attribValue pcommon.Value) ptrace.Traces { @@ -285,8 +385,12 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) - tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), tt.cfg, sink) + cfg := *tt.cfg + + cfg.HashSeed = defaultHashSeed + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), &cfg, sink) require.NoError(t, err) err = tsp.ConsumeTraces(context.Background(), tt.td) @@ -394,6 +498,11 @@ func getSpanWithAttributes(key string, value pcommon.Value) ptrace.Span { func initSpanWithAttribute(key string, value pcommon.Value, dest ptrace.Span) { dest.SetName("spanName") value.CopyTo(dest.Attributes().PutEmpty(key)) + + // ensure a non-empty trace ID with a deterministic value, one that has + // all zero bits for the w3c randomness portion. this value, if sampled + // with the OTel specification, has R-value 0 and sampled only at 100%. + dest.SetTraceID(pcommon.TraceID{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) } // genRandomTestData generates a slice of ptrace.Traces with the numBatches elements which one with @@ -428,10 +537,40 @@ func genRandomTestData(numBatches, numTracesPerBatch int, serviceName string, re return traceBatches } -// assertSampledData checks for no repeated traceIDs and counts the number of spans on the sampled data for +// assertTraces is a traces consumer.Traces +type assertTraces struct { + *testing.T + testName string + traceIDs map[[16]byte]bool + spanCount int +} + +var _ consumer.Traces = &assertTraces{} + +func (a *assertTraces) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (a *assertTraces) ConsumeTraces(_ context.Context, data ptrace.Traces) error { + tt := [1]ptrace.Traces{ + data, + } + a.onSampledData(tt[:]) + return nil +} + +func newAssertTraces(t *testing.T, name string) *assertTraces { + return &assertTraces{ + T: t, + testName: name, + traceIDs: map[[16]byte]bool{}, + spanCount: 0, + } +} + +// onSampledData checks for no repeated traceIDs and counts the number of spans on the sampled data for // the given service. -func assertSampledData(t *testing.T, sampled []ptrace.Traces, serviceName string) (traceIDs map[[16]byte]bool, spanCount int) { - traceIDs = make(map[[16]byte]bool) +func (a *assertTraces) onSampledData(sampled []ptrace.Traces) { for _, td := range sampled { rspans := td.ResourceSpans() for i := 0; i < rspans.Len(); i++ { @@ -439,23 +578,22 @@ func assertSampledData(t *testing.T, sampled []ptrace.Traces, serviceName string ilss := rspan.ScopeSpans() for j := 0; j < ilss.Len(); j++ { ils := ilss.At(j) - if svcNameAttr, _ := rspan.Resource().Attributes().Get("service.name"); svcNameAttr.Str() != serviceName { + if svcNameAttr, _ := rspan.Resource().Attributes().Get("service.name"); svcNameAttr.Str() != a.testName { continue } for k := 0; k < ils.Spans().Len(); k++ { - spanCount++ + a.spanCount++ span := ils.Spans().At(k) key := span.TraceID() - if traceIDs[key] { - t.Errorf("same traceID used more than once %q", key) + if a.traceIDs[key] { + a.Errorf("same traceID used more than once %q", key) return } - traceIDs[key] = true + a.traceIDs[key] = true } } } } - return } // mustParseTID generates TraceIDs from their hex encoding, for