From 590d50c937506b28f35aef21c8be847ff03e50a5 Mon Sep 17 00:00:00 2001 From: Otmar Ertl Date: Fri, 27 May 2022 09:54:15 +0200 Subject: [PATCH 1/3] upgraded to hipparchus 2.1 --- consistent-sampling/build.gradle.kts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consistent-sampling/build.gradle.kts b/consistent-sampling/build.gradle.kts index 1ad1faad1..bdeaaf542 100644 --- a/consistent-sampling/build.gradle.kts +++ b/consistent-sampling/build.gradle.kts @@ -7,6 +7,6 @@ description = "Sampler and exporter implementations for consistent sampling" dependencies { api("io.opentelemetry:opentelemetry-sdk-trace") - testImplementation("org.hipparchus:hipparchus-core:2.0") - testImplementation("org.hipparchus:hipparchus-stat:2.0") + testImplementation("org.hipparchus:hipparchus-core:2.1") + testImplementation("org.hipparchus:hipparchus-stat:2.1") } From 172e6cc7c8860722d07e562ee5aa667aac22487d Mon Sep 17 00:00:00 2001 From: Otmar Ertl Date: Fri, 27 May 2022 16:35:46 +0200 Subject: [PATCH 2/3] consistent reservoir sampling span processor --- ...sistentReservoirSamplingSpanProcessor.java | 564 ++++++++++++++ .../contrib/samplers/RandomGenerator.java | 91 +++ ...entReservoirSamplingSpanProcessorTest.java | 689 ++++++++++++++++++ .../contrib/samplers/RandomGeneratorTest.java | 78 ++ .../opentelemetry/contrib/util/TestUtil.java | 77 ++ 5 files changed, 1499 insertions(+) create mode 100644 consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java create mode 100644 consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessorTest.java create mode 100644 consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/RandomGeneratorTest.java create mode 100644 consistent-sampling/src/test/java/io/opentelemetry/contrib/util/TestUtil.java diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java new file mode 100644 index 000000000..bcda18e62 --- /dev/null +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java @@ -0,0 +1,564 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.DaemonThreadFactory; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.DelegatingSpanData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * A {@link SpanProcessor} which periodically exports a fixed maximum number of spans. If the number + * of spans in a period exceeds the fixed reservoir (buffer) size, spans will be consistently + * (compare {@link ConsistentSampler}) sampled. + */ +public final class ConsistentReservoirSamplingSpanProcessor implements SpanProcessor { + + private static final String WORKER_THREAD_NAME = + ConsistentReservoirSamplingSpanProcessor.class.getSimpleName() + "_WorkerThread"; + + private final Worker worker; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + public static final long DEFAULT_EXPORT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); + + private static final class ReadableSpanWithPriority { + + private final ReadableSpan readableSpan; + private int pval; + private final int rval; + private final long priority; + + public static ReadableSpanWithPriority create( + ReadableSpan readableSpan, RandomGenerator randomGenerator) { + String otelTraceStateString = + readableSpan.getSpanContext().getTraceState().get(OtelTraceState.TRACE_STATE_KEY); + OtelTraceState otelTraceState = OtelTraceState.parse(otelTraceStateString); + int pval; + int rval; + long priority = randomGenerator.nextLong(); + if (otelTraceState.hasValidR()) { + rval = otelTraceState.getR(); + } else { + rval = + Math.min(randomGenerator.numberOfLeadingZerosOfRandomLong(), OtelTraceState.getMaxR()); + } + + if (otelTraceState.hasValidP()) { + pval = otelTraceState.getP(); + } else { + // if the p-value is not defined assume it is zero, + // which corresponds to an adjusted count of 1 + pval = 0; + } + + return new ReadableSpanWithPriority(readableSpan, pval, rval, priority); + } + + private ReadableSpanWithPriority(ReadableSpan readableSpan, int pval, int rval, long priority) { + this.readableSpan = readableSpan; + this.pval = pval; + this.rval = rval; + this.priority = priority; + } + + private ReadableSpan getReadableSpan() { + return readableSpan; + } + + private int getP() { + return pval; + } + + private void setP(int pval) { + this.pval = pval; + } + + private int getR() { + return rval; + } + + private static int compareRthenPriority( + ReadableSpanWithPriority s1, ReadableSpanWithPriority s2) { + int compareR = Integer.compare(s1.rval, s2.rval); + if (compareR != 0) { + return compareR; + } + return Long.compare(s1.priority, s2.priority); + } + } + + /** + * A reservoir sampling buffer that collects a fixed number of spans. + * + *

Consistent sampling requires that spans are sampled only if r-value >= p-value, where + * p-value describes which sampling rate from the discrete set of possible sampling rates is + * applied. Consistent sampling allows to choose the sampling rate (the p-value) individually for + * every span. Therefore, the number of sampled spans can be reduced by increasing the p-value of + * spans, such that spans for which r-value < p-value get discarded. To reduce the number of + * sampled spans one can therefore apply the following procedure until the desired number of spans + * are left: + * + *

1) Randomly choose a span among the spans with smallest p-values + * + *

2) Increment its p-value by 1 + * + *

3) Discard the span, if r-value < p-value + * + *

4) continue with 1) + * + *

By always incrementing one of the smallest p-values, this approach tries to balance the + * sampling rates (p-values). Balanced sampling rates are better for estimation (compare VarOpt sampling). + * + *

This sampling approach can be implemented in a streaming fashion. In order to ensure that + * spans have fair chances regardless of processing order, a uniform random number (priority) is + * associated with its p-value. When choosing a span among all spans with smallest p-value, we + * take that with the smallest priority. For that, a priority queue is needed. + * + *

In the following, an equivalent and more efficient sampling approach is described, that is + * based on a priority queue where the minimum is the span with the smallest r-value. In this way + * the {@code add}-operation will have a worst case time complexity of {@code O(log n)} where + * {@code n} denotes the reservoir size. We use the following notation: + * + *

Z := {@code reservoirSize} + * + *

L := {@code maxDiscardedRValue} + * + *

R := {@code numberOfDiscardedSpansWithMaxDiscardedRValue} + * + *

K := {@code numSampledSpansWithGreaterRValueAndSmallPValue} + * + *

X := {@code numberOfSampledSpansWithMaxDiscardedRValue} + * + *

The sampling approach described above can be equivalently performed by keeping Z spans with + * largest r-values (in case of ties with highest priority) and adjusting the corresponding + * p-values in a finalization step. We know that the largest r-value among the dropped spans is L + * and that we had to discard exactly R spans with (r-value == L). This implies that their + * corresponding p-values were raised to (L + 1) which finally violated the sampling condition + * (r-value >= p-value). We only raise the p-value of some span, if it belongs to the set of spans + * with minimum p-value. Therefore, the minimum p-value must be given by L. To determine the + * p-values of all finally kept spans, we consider 3 cases: + * + *

1) For all X kept spans with r-value == L the corresponding p-value must also be L. + * Otherwise, the span would have been discarded. There are R spans with (r-value == L) which have + * been discarded. Therefore, among the original (X + R) spans with (r-value == L) we have kept X + * spans. + * + *

2) For spans with (p-value > L) the p-value will not be changed as they do not belong to the + * set of spans with minimal p-values. + * + *

3) For the remaining K spans for which (r-value > L) and (p-value <= L) the p-value needs to + * be adjusted. The new p-value will be either L or (L + 1). When starting to sample the first + * spans with (p-value == L), we have N = R + K + X spans which all have (r-value >= L) and + * (p-value == L). This set can be divided into two sets of spans dependent on whether (r-value == + * L) or (r-value > L). We know that there were (R + X) spans with (r-value == L) and K spans with + * (r-value > L). When randomly selecting a span to increase its p-value, the span will only be + * discarded if the span belongs to the first set (r-value == L). We will call such an event + * "failure". If the selected span belongs to the second set (r-value > L), its p-value will be + * increased by 1 to (L + 1) but the span will not be dropped. The sampling procedure will be + * stopped after R "failures". The number of "successes" follows a negative + * hypergeometric distribution. Therefore, we need to sample a random value from a negative + * hypergeometric distribution with N = R + X + K elements of which K are "successes" and after + * drawing R "failures", in order to determine how many spans out of K will get a p-value equal to + * (L + 1). The expected number is given by R * K / (N - K + 1) = R * K / (R + X + 1). Instead of + * drawing the number from the negative hypergeometric distribution we could also set it to the + * stochastically rounded expected value. This makes this reservoir sampling approach not fully + * equivalent to the approach described initially, but leads to a smaller variance when + * estimating. + */ + private static final class Reservoir { + private final int reservoirSize; + private int maxDiscardedRValue = 0; + private long numberOfDiscardedSpansWithMaxDiscardedRValue = 0; + private final PriorityQueue queue; + private final RandomGenerator randomGenerator; + + public Reservoir(int reservoirSize, RandomGenerator randomGenerator) { + if (reservoirSize < 1) { + throw new IllegalArgumentException(); + } + this.reservoirSize = reservoirSize; + this.queue = + new PriorityQueue<>(reservoirSize, ReadableSpanWithPriority::compareRthenPriority); + this.randomGenerator = randomGenerator; + } + + public void add(ReadableSpanWithPriority readableSpanWithPriority) { + + if (queue.size() < reservoirSize) { + queue.add(readableSpanWithPriority); + return; + } + + ReadableSpanWithPriority head = queue.peek(); + if (ReadableSpanWithPriority.compareRthenPriority(readableSpanWithPriority, head) > 0) { + queue.remove(); + queue.add(readableSpanWithPriority); + readableSpanWithPriority = head; + } + if (readableSpanWithPriority.getR() > maxDiscardedRValue) { + maxDiscardedRValue = readableSpanWithPriority.getR(); + numberOfDiscardedSpansWithMaxDiscardedRValue = 1; + } else if (readableSpanWithPriority.getR() == maxDiscardedRValue) { + numberOfDiscardedSpansWithMaxDiscardedRValue += 1; + } + } + + public List getResult() { + + if (numberOfDiscardedSpansWithMaxDiscardedRValue == 0) { + return queue.stream().map(x -> x.readableSpan.toSpanData()).collect(Collectors.toList()); + } + + List readableSpansWithPriority = new ArrayList<>(queue.size()); + int numberOfSampledSpansWithMaxDiscardedRValue = 0; + int numSampledSpansWithGreaterRValueAndSmallPValue = 0; + for (ReadableSpanWithPriority readableSpanWithPriority : queue) { + if (readableSpanWithPriority.getR() == maxDiscardedRValue) { + numberOfSampledSpansWithMaxDiscardedRValue += 1; + } else if (readableSpanWithPriority.getP() <= maxDiscardedRValue) { + numSampledSpansWithGreaterRValueAndSmallPValue += 1; + } + readableSpansWithPriority.add(readableSpanWithPriority); + } + + double expectedNumPValueIncrements = + numSampledSpansWithGreaterRValueAndSmallPValue + * (numberOfDiscardedSpansWithMaxDiscardedRValue + / (double) + (numberOfDiscardedSpansWithMaxDiscardedRValue + + numberOfSampledSpansWithMaxDiscardedRValue + + 1L)); + int roundedExpectedNumPValueIncrements = + Math.toIntExact(randomGenerator.roundStochastically(expectedNumPValueIncrements)); + + BitSet incrementIndicators = + randomGenerator.generateRandomBitSet( + numSampledSpansWithGreaterRValueAndSmallPValue, roundedExpectedNumPValueIncrements); + + int incrementIndicatorIndex = 0; + List result = new ArrayList<>(queue.size()); + for (ReadableSpanWithPriority readableSpanWithPriority : readableSpansWithPriority) { + if (readableSpanWithPriority.getP() <= maxDiscardedRValue) { + readableSpanWithPriority.setP(maxDiscardedRValue); + if (readableSpanWithPriority.getR() > maxDiscardedRValue) { + if (incrementIndicators.get(incrementIndicatorIndex)) { + readableSpanWithPriority.setP(maxDiscardedRValue + 1); + } + incrementIndicatorIndex += 1; + } + } + + SpanData spanData = readableSpanWithPriority.getReadableSpan().toSpanData(); + SpanContext spanContext = spanData.getSpanContext(); + TraceState traceState = spanContext.getTraceState(); + String otelTraceStateString = traceState.get(OtelTraceState.TRACE_STATE_KEY); + OtelTraceState otelTraceState = OtelTraceState.parse(otelTraceStateString); + if ((!otelTraceState.hasValidR() && readableSpanWithPriority.getP() > 0) + || (otelTraceState.hasValidR() + && readableSpanWithPriority.getP() != otelTraceState.getP())) { + otelTraceState.setP(readableSpanWithPriority.getP()); + spanData = updateSpanDataWithOtelTraceState(spanData, otelTraceState); + } + result.add(spanData); + } + + return result; + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + } + + private static SpanData updateSpanDataWithOtelTraceState( + SpanData spanData, OtelTraceState otelTraceState) { + SpanContext spanContext = spanData.getSpanContext(); + TraceState traceState = spanContext.getTraceState(); + String updatedOtelTraceStateString = otelTraceState.serialize(); + TraceState updatedTraceState = + traceState.toBuilder() + .put(OtelTraceState.TRACE_STATE_KEY, updatedOtelTraceStateString) + .build(); + SpanContext updatedSpanContext = + SpanContext.create( + spanContext.getTraceId(), + spanContext.getSpanId(), + spanContext.getTraceFlags(), + updatedTraceState); + return new DelegatingSpanData(spanData) { + @Override + public SpanContext getSpanContext() { + return updatedSpanContext; + } + }; + } + + // visible for testing + static SpanProcessor create( + SpanExporter spanExporter, + int reservoirSize, + long exportPeriodNanos, + long exporterTimeoutNanos, + RandomGenerator randomGenerator) { + return new ConsistentReservoirSamplingSpanProcessor( + spanExporter, exportPeriodNanos, reservoirSize, exporterTimeoutNanos, randomGenerator); + } + + /** + * Creates a new {@link SpanProcessor} which periodically exports a fixed maximum number of spans. + * If the number of spans in a period exceeds the fixed reservoir (buffer) size, spans will be + * consistently (compare {@link ConsistentSampler}) sampled. + * + * @param spanExporter a span exporter + * @param reservoirSize the reservoir size + * @param exportPeriodNanos the export period in nanoseconds + * @param exporterTimeoutNanos the exporter timeout in nanoseconds + * @return a span processor + */ + public static SpanProcessor create( + SpanExporter spanExporter, + int reservoirSize, + long exportPeriodNanos, + long exporterTimeoutNanos) { + return create( + spanExporter, + reservoirSize, + exportPeriodNanos, + exporterTimeoutNanos, + RandomGenerator.getDefault()); + } + + /** + * Creates a new {@link SpanProcessor} which periodically exports a fixed maximum number of spans. + * If the number of spans in a period exceeds the fixed reservoir (buffer) size, spans will be + * consistently (compare {@link ConsistentSampler}) sampled. + * + * @param spanExporter a span exporter + * @param reservoirSize the reservoir size + * @param exportPeriodNanos the export period in nanoseconds + * @return a span processor + */ + static SpanProcessor create( + SpanExporter spanExporter, int reservoirSize, long exportPeriodNanos) { + return create(spanExporter, reservoirSize, exportPeriodNanos, DEFAULT_EXPORT_TIMEOUT_NANOS); + } + + private ConsistentReservoirSamplingSpanProcessor( + SpanExporter spanExporter, + long exportPeriodNanos, + int reservoirSize, + long exporterTimeoutNanos, + RandomGenerator randomGenerator) { + requireNonNull(spanExporter, "spanExporter"); + checkArgument(exportPeriodNanos > 0, "export period must be positive"); + checkArgument(reservoirSize > 0, "reservoir size must be positive"); + checkArgument(exporterTimeoutNanos > 0, "exporter timeout must be positive"); + requireNonNull(randomGenerator, "randomGenerator"); + + this.worker = + new Worker( + spanExporter, exportPeriodNanos, reservoirSize, exporterTimeoutNanos, randomGenerator); + Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); + workerThread.start(); + } + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) {} + + @Override + public boolean isStartRequired() { + return false; + } + + @Override + public void onEnd(ReadableSpan span) { + if (span == null || !span.getSpanContext().isSampled()) { + return; + } + worker.addSpan(span); + } + + @Override + public boolean isEndRequired() { + return true; + } + + @Override + public CompletableResultCode shutdown() { + if (isShutdown.getAndSet(true)) { + return CompletableResultCode.ofSuccess(); + } + return worker.shutdown(); + } + + @Override + public CompletableResultCode forceFlush() { + return worker.forceFlush(); + } + + // Visible for testing + boolean isReservoirEmpty() { + return worker.isReservoirEmpty(); + } + + private static final class Worker implements Runnable { + + private static final Logger logger = Logger.getLogger(Worker.class.getName()); + private final SpanExporter spanExporter; + private final long exportPeriodNanos; + private final int reservoirSize; + private final long exporterTimeoutNanos; + + private long nextExportTime; + + private final RandomGenerator randomGenerator; + private final Object reservoirLock = new Object(); + private Reservoir reservoir; + private final BlockingQueue signal; + private volatile boolean continueWork = true; + + private static Reservoir createReservoir(int reservoirSize, RandomGenerator randomGenerator) { + return new Reservoir(reservoirSize, randomGenerator); + } + + private Worker( + SpanExporter spanExporter, + long exportPeriodNanos, + int reservoirSize, + long exporterTimeoutNanos, + RandomGenerator randomGenerator) { + this.spanExporter = spanExporter; + this.exportPeriodNanos = exportPeriodNanos; + this.reservoirSize = reservoirSize; + this.exporterTimeoutNanos = exporterTimeoutNanos; + this.randomGenerator = randomGenerator; + synchronized (reservoirLock) { + this.reservoir = createReservoir(reservoirSize, randomGenerator); + } + this.signal = new ArrayBlockingQueue<>(1); + } + + private void addSpan(ReadableSpan span) { + ReadableSpanWithPriority readableSpanWithPriority = + ReadableSpanWithPriority.create(span, randomGenerator); + synchronized (reservoirLock) { + reservoir.add(readableSpanWithPriority); + } + } + + @Override + public void run() { + updateNextExportTime(); + CompletableResultCode completableResultCode = null; + while (continueWork) { + + if (completableResultCode != null || System.nanoTime() >= nextExportTime) { + Reservoir oldReservoir; + Reservoir newReservoir = createReservoir(reservoirSize, randomGenerator); + synchronized (reservoirLock) { + oldReservoir = reservoir; + reservoir = newReservoir; + } + exportCurrentBatch(oldReservoir.getResult()); + updateNextExportTime(); + if (completableResultCode != null) { + completableResultCode.succeed(); + } + } + + try { + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + completableResultCode = signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + private void updateNextExportTime() { + nextExportTime = System.nanoTime() + exportPeriodNanos; + } + + private CompletableResultCode shutdown() { + CompletableResultCode result = new CompletableResultCode(); + + CompletableResultCode flushResult = forceFlush(); + flushResult.whenComplete( + () -> { + continueWork = false; + CompletableResultCode shutdownResult = spanExporter.shutdown(); + shutdownResult.whenComplete( + () -> { + if (!flushResult.isSuccess() || !shutdownResult.isSuccess()) { + result.fail(); + } else { + result.succeed(); + } + }); + }); + + return result; + } + + private CompletableResultCode forceFlush() { + CompletableResultCode flushResult = new CompletableResultCode(); + signal.offer(flushResult); + return flushResult; + } + + private void exportCurrentBatch(List batch) { + if (batch.isEmpty()) { + return; + } + + try { + CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch)); + result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); + if (!result.isSuccess()) { + logger.log(Level.FINE, "Exporter failed"); + } + } catch (RuntimeException e) { + logger.log(Level.WARNING, "Exporter threw an Exception", e); + } finally { + batch.clear(); + } + } + + private boolean isReservoirEmpty() { + synchronized (reservoirLock) { + return reservoir.isEmpty(); + } + } + } +} diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java index 4363eadeb..f44822aff 100644 --- a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import java.util.BitSet; import java.util.concurrent.ThreadLocalRandom; import java.util.function.LongSupplier; @@ -131,4 +132,94 @@ public boolean nextBoolean(double probability) { public int numberOfLeadingZerosOfRandomLong() { return threadLocalData.get().numberOfLeadingZerosOfRandomLong(threadSafeRandomLongSupplier); } + + /** + * Returns a pseudorandomly chosen {@code long} value. + * + * @return a pseudorandomly chosen {@code long} value + */ + public long nextLong() { + return threadSafeRandomLongSupplier.getAsLong(); + } + + /** + * Stochastically rounds the given floating-point value. + * + *

see https://en.wikipedia.org/wiki/Rounding#Stochastic_rounding + * + * @param x the value to be rounded + * @return the rounded value + */ + public long roundStochastically(double x) { + long i = (long) Math.floor(x); + if (nextBoolean(x - i)) { + return i + 1; + } else { + return i; + } + } + + /** + * Returns a pseudorandomly chosen {@code int} value between zero (inclusive) and the specified + * bound (exclusive). + * + *

The implementation is based on Daniel Lemire's algorithm as described in "Fast random + * integer generation in an interval." ACM Transactions on Modeling and Computer Simulation + * (TOMACS) 29.1 (2019): 3. + * + * @param bound the upper bound (exclusive) for the returned value. Must be positive. + * @return a pseudorandomly chosen {@code int} value between zero (inclusive) and the bound + * (exclusive) + * @throws IllegalArgumentException if {@code bound} is not positive + */ + private int nextInt(int bound) { + if (bound <= 0) { + throw new IllegalArgumentException(); + } + long x = nextLong() >>> 33; // use only 31 random bits + long m = x * bound; + int l = (int) m & 0x7FFFFFFF; + if (l < bound) { + int t = (-bound & 0x7FFFFFFF) % bound; + while (l < t) { + x = nextLong() >>> 33; // use only 31 random bits + m = x * bound; + l = (int) m & 0x7FFFFFFF; + } + } + return (int) (m >>> 31); + } + + /** + * Generates a random bit set where a given number of 1-bits are randomly set. + * + * @param numBits the total number of bits + * @param numOneBits the number of 1-bits + * @return a random bit set + * @throws IllegalArgumentException if {@code 0 <= numOneBits <= numBits} is violated + */ + public BitSet generateRandomBitSet(int numBits, int numOneBits) { + + if (numOneBits < 0 || numOneBits > numBits) { + throw new IllegalArgumentException(); + } + + BitSet result = new BitSet(numBits); + int numZeroBits = numBits - numOneBits; + + // based on Fisher-Yates shuffling + for (int i = Math.max(numZeroBits, numOneBits); i < numBits; ++i) { + int j = nextInt(i + 1); + if (result.get(j)) { + result.set(i); + } else { + result.set(j); + } + } + if (numZeroBits < numOneBits) { + result.flip(0, numBits); + } + + return result; + } } diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessorTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessorTest.java new file mode 100644 index 000000000..8c20a644b --- /dev/null +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessorTest.java @@ -0,0 +1,689 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static io.opentelemetry.contrib.samplers.ConsistentReservoirSamplingSpanProcessor.DEFAULT_EXPORT_TIMEOUT_NANOS; +import static io.opentelemetry.contrib.util.TestUtil.verifyObservedPvaluesUsingGtest; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SplittableRandom; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; +import java.util.stream.IntStream; +import javax.annotation.Nullable; +import org.hipparchus.distribution.discrete.BinomialDistribution; +import org.hipparchus.stat.inference.GTest; +import org.hipparchus.stat.inference.TTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.ArgumentMatcher; +import org.mockito.ArgumentMatchers; + +class ConsistentReservoirSamplingSpanProcessorTest { + + private static final String SPAN_NAME_1 = "MySpanName/1"; + private static final String SPAN_NAME_2 = "MySpanName/2"; + private static final String SPAN_NAME_3 = "MySpanName/3"; + private static final int RESERVOIR_SIZE = 4096; + private static final long EXPORT_PERIOD_10_MILLIS_AS_NANOS = TimeUnit.MILLISECONDS.toNanos(10); + private static final long EXPORT_PERIOD_100_MILLIS_AS_NANOS = TimeUnit.MILLISECONDS.toNanos(100); + private static final long VERY_LONG_EXPORT_PERIOD_NANOS = TimeUnit.SECONDS.toNanos(10000); + + private static void shutdown(SdkTracerProvider sdkTracerProvider) { + sdkTracerProvider.shutdown().join(1000, TimeUnit.SECONDS); + } + + private static class WaitingSpanExporter implements SpanExporter { + + private final List spanDataList = new ArrayList<>(); + private final int numberOfSpansToWaitFor; + private volatile CountDownLatch countDownLatch; + private final AtomicBoolean shutDownCalled = new AtomicBoolean(false); + + WaitingSpanExporter(int numberOfSpansToWaitFor) { + countDownLatch = new CountDownLatch(numberOfSpansToWaitFor); + this.numberOfSpansToWaitFor = numberOfSpansToWaitFor; + } + + List getExported() { + List result = new ArrayList<>(spanDataList); + spanDataList.clear(); + return result; + } + + /** + * Waits until {@code numberOfSpansToWaitFor} spans have been exported. Returns the list of + * exported {@link SpanData} objects, otherwise {@code null} if the current thread is + * interrupted. + * + * @return the list of exported {@link SpanData} objects, otherwise {@code null} if the current + * thread is interrupted. + */ + @Nullable + List waitForExport() { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + // Preserve the interruption status as per guidance. + Thread.currentThread().interrupt(); + return null; + } + return getExported(); + } + + @Override + public CompletableResultCode export(Collection spans) { + this.spanDataList.addAll(spans); + for (int i = 0; i < spans.size(); i++) { + countDownLatch.countDown(); + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + shutDownCalled.set(true); + return CompletableResultCode.ofSuccess(); + } + + public void reset() { + this.countDownLatch = new CountDownLatch(numberOfSpansToWaitFor); + } + } + + @Nullable + private ReadableSpan createEndedSpan(String spanName, SdkTracerProvider sdkTracerProvider) { + Tracer tracer = sdkTracerProvider.get(getClass().getName()); + Span span = tracer.spanBuilder(spanName).startSpan(); + span.end(); + if (span instanceof ReadableSpan) { + return (ReadableSpan) span; + } else { + return null; + } + } + + @Test + void invalidConfig() { + SpanExporter exporter = mock(SpanExporter.class); + when(exporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(null, 1, 1)) + .isInstanceOf(NullPointerException.class) + .hasMessage("spanExporter"); + assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(exporter, -1, 1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("reservoir size must be positive"); + assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(exporter, 1, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("export period must be positive"); + assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(exporter, 1, 1, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("exporter timeout must be positive"); + assertThatThrownBy( + () -> ConsistentReservoirSamplingSpanProcessor.create(exporter, 1, 1, 1, null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("randomGenerator"); + } + + @Test + void startEndRequirements() { + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + new WaitingSpanExporter(0), RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS); + assertThat(processor.isStartRequired()).isFalse(); + assertThat(processor.isEndRequired()).isTrue(); + } + + @Test + @Timeout(10) + void exportDifferentSampledSpans() { + WaitingSpanExporter exporter = new WaitingSpanExporter(2); + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + ConsistentReservoirSamplingSpanProcessor.create( + exporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS)) + .build(); + + ReadableSpan span1 = createEndedSpan(SPAN_NAME_1, sdkTracerProvider); + ReadableSpan span2 = createEndedSpan(SPAN_NAME_2, sdkTracerProvider); + ReadableSpan span3 = createEndedSpan(SPAN_NAME_3, sdkTracerProvider); + List exported = exporter.waitForExport(); + assertThat(exported) + .containsExactlyInAnyOrder(span1.toSpanData(), span2.toSpanData(), span3.toSpanData()); + + shutdown(sdkTracerProvider); + } + + @Test + @Timeout(10) + void forceExport() { + WaitingSpanExporter exporter = new WaitingSpanExporter(100); + int reservoirSize = 50; + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + exporter, reservoirSize, VERY_LONG_EXPORT_PERIOD_NANOS); + + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder().addSpanProcessor(processor).build(); + for (int i = 0; i < 100; i++) { + createEndedSpan("MySpanName/" + i, sdkTracerProvider); + } + + processor.forceFlush().join(10, TimeUnit.SECONDS); + List exported = exporter.getExported(); + assertThat(exported).isNotNull(); + assertThat(exported.size()).isEqualTo(reservoirSize); + + shutdown(sdkTracerProvider); + } + + @Test + @Timeout(10) + void exportSpansToMultipleServices() { + WaitingSpanExporter waitingSpanExporter1 = new WaitingSpanExporter(2); + WaitingSpanExporter waitingSpanExporter2 = new WaitingSpanExporter(2); + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + ConsistentReservoirSamplingSpanProcessor.create( + SpanExporter.composite( + Arrays.asList(waitingSpanExporter1, waitingSpanExporter2)), + RESERVOIR_SIZE, + EXPORT_PERIOD_100_MILLIS_AS_NANOS)) + .build(); + + ReadableSpan span1 = createEndedSpan(SPAN_NAME_1, sdkTracerProvider); + ReadableSpan span2 = createEndedSpan(SPAN_NAME_2, sdkTracerProvider); + List exported1 = waitingSpanExporter1.waitForExport(); + List exported2 = waitingSpanExporter2.waitForExport(); + assertThat(exported1).containsExactlyInAnyOrder(span1.toSpanData(), span2.toSpanData()); + assertThat(exported2).containsExactlyInAnyOrder(span1.toSpanData(), span2.toSpanData()); + + shutdown(sdkTracerProvider); + } + + @Test + void ignoresNullSpans() { + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + mock(SpanExporter.class), RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS); + assertThatCode( + () -> { + processor.onStart(null, null); + processor.onEnd(null); + }) + .doesNotThrowAnyException(); + + processor.shutdown(); + } + + @Test + @Timeout(10) + void exporterThrowsException() { + SpanExporter failingExporter = mock(SpanExporter.class); + when(failingExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + doThrow(new IllegalArgumentException("No export for you.")) + .when(failingExporter) + .export(ArgumentMatchers.anyList()); + + WaitingSpanExporter workingExporter = new WaitingSpanExporter(1); + + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + ConsistentReservoirSamplingSpanProcessor.create( + SpanExporter.composite(Arrays.asList(failingExporter, workingExporter)), + RESERVOIR_SIZE, + EXPORT_PERIOD_100_MILLIS_AS_NANOS)) + .build(); + + ReadableSpan span1 = createEndedSpan(SPAN_NAME_1, sdkTracerProvider); + List exported = workingExporter.waitForExport(); + assertThat(exported).containsExactly(span1.toSpanData()); + + workingExporter.reset(); + + ReadableSpan span2 = createEndedSpan(SPAN_NAME_2, sdkTracerProvider); + exported = workingExporter.waitForExport(); + assertThat(exported).containsExactly(span2.toSpanData()); + + shutdown(sdkTracerProvider); + } + + private static ArgumentMatcher> containsSpanName( + String spanName, Runnable runOnMatch) { + return spans -> { + assertThat(spans).anySatisfy(span -> assertThat(span.getName()).isEqualTo(spanName)); + runOnMatch.run(); + return true; + }; + } + + private static void awaitReservoirEmpty(SpanProcessor processor) { + await() + .untilAsserted( + () -> + assertThat( + ((ConsistentReservoirSamplingSpanProcessor) processor).isReservoirEmpty()) + .isTrue()); + } + + @Test + @Timeout(10) + public void continuesIfExporterTimesOut() throws InterruptedException { + SpanExporter mockSpanExporter = mock(SpanExporter.class); + when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + mockSpanExporter, + RESERVOIR_SIZE, + EXPORT_PERIOD_10_MILLIS_AS_NANOS, + TimeUnit.MILLISECONDS.toNanos(1)); + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder().addSpanProcessor(processor).build(); + + CountDownLatch exported = new CountDownLatch(1); + + // We return a result we never complete, meaning it will timeout. + when(mockSpanExporter.export(argThat(containsSpanName(SPAN_NAME_1, exported::countDown)))) + .thenReturn(new CompletableResultCode()); + + createEndedSpan(SPAN_NAME_1, sdkTracerProvider); + exported.await(); + + // Timed out so the span was dropped. + awaitReservoirEmpty(processor); + + // Still processing new spans. + CountDownLatch exportedAgain = new CountDownLatch(1); + reset(mockSpanExporter); + when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(mockSpanExporter.export(argThat(containsSpanName(SPAN_NAME_2, exportedAgain::countDown)))) + .thenReturn(CompletableResultCode.ofSuccess()); + createEndedSpan(SPAN_NAME_2, sdkTracerProvider); + exported.await(); + awaitReservoirEmpty(processor); + + shutdown(sdkTracerProvider); + } + + @Test + @Timeout(10) + void exportNotSampledNotRecordedSpans() { + Sampler mockSampler = mock(Sampler.class); + WaitingSpanExporter exporter = new WaitingSpanExporter(1); + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + ConsistentReservoirSamplingSpanProcessor.create( + exporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS)) + .setSampler(mockSampler) + .build(); + + when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList())) + .thenReturn(SamplingResult.drop()); + sdkTracerProvider.get("test").spanBuilder(SPAN_NAME_1).startSpan().end(); + sdkTracerProvider.get("test").spanBuilder(SPAN_NAME_2).startSpan().end(); + when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList())) + .thenReturn(SamplingResult.recordAndSample()); + ReadableSpan span = createEndedSpan(SPAN_NAME_2, sdkTracerProvider); + // Spans are recorded and exported in the same order as they are ended, we test that a non + // sampled span is not exported by creating and ending a sampled span after a non sampled span + // and checking that the first exported span is the sampled span (the non sampled did not get + // exported). + List exported = exporter.waitForExport(); + assertThat(exported).containsExactly(span.toSpanData()); + + shutdown(sdkTracerProvider); + } + + @Test + @Timeout(10) + void exportNotSampledButRecordedSpans() { + WaitingSpanExporter exporter = new WaitingSpanExporter(1); + + Sampler mockSampler = mock(Sampler.class); + when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList())) + .thenReturn(SamplingResult.recordOnly()); + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + ConsistentReservoirSamplingSpanProcessor.create( + exporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS)) + .setSampler(mockSampler) + .build(); + + createEndedSpan(SPAN_NAME_1, sdkTracerProvider); + when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList())) + .thenReturn(SamplingResult.recordAndSample()); + ReadableSpan span = createEndedSpan(SPAN_NAME_2, sdkTracerProvider); + + // Spans are recorded and exported in the same order as they are ended, we test that a non + // exported span is not exported by creating and ending a sampled span after a non sampled span + // and checking that the first exported span is the sampled span (the non sampled did not get + // exported). + List exported = exporter.waitForExport(); + assertThat(exported).containsExactly(span.toSpanData()); + + shutdown(sdkTracerProvider); + } + + @Test + @Timeout(10) + void shutdownFlushes() { + WaitingSpanExporter exporter = new WaitingSpanExporter(1); + + // Set the export period to large value, in order to confirm the #flush() below works + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + ConsistentReservoirSamplingSpanProcessor.create( + exporter, RESERVOIR_SIZE, VERY_LONG_EXPORT_PERIOD_NANOS)) + .build(); + + ReadableSpan span = createEndedSpan(SPAN_NAME_1, sdkTracerProvider); + + // Force a shutdown, which forces processing of all remaining spans. + shutdown(sdkTracerProvider); + + List exported = exporter.getExported(); + assertThat(exported).containsExactly(span.toSpanData()); + assertThat(exporter.shutDownCalled.get()).isTrue(); + } + + @Test + void shutdownPropagatesSuccess() { + SpanExporter mockSpanExporter = mock(SpanExporter.class); + when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + mockSpanExporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS); + CompletableResultCode result = processor.shutdown(); + result.join(1, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void shutdownPropagatesFailure() { + SpanExporter mockSpanExporter = mock(SpanExporter.class); + when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + mockSpanExporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS); + CompletableResultCode result = processor.shutdown(); + result.join(1, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + } + + @Test + @Timeout(10) + void fullReservoir() { + int reservoirSize = 10; + int numberOfSpans = 100; + + WaitingSpanExporter exporter = new WaitingSpanExporter(reservoirSize); + + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + exporter, reservoirSize, VERY_LONG_EXPORT_PERIOD_NANOS); + + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .setSampler(ConsistentSampler.alwaysOn()) + .addSpanProcessor(processor) + .build(); + + IntStream.range(0, numberOfSpans) + .forEach(i -> createEndedSpan("MySpanName/" + i, sdkTracerProvider)); + + processor.forceFlush().join(10, TimeUnit.SECONDS); + + List exported = exporter.waitForExport(); + assertThat(exported).hasSize(reservoirSize); + + shutdown(sdkTracerProvider); + } + + private enum Tests { + VERIFY_MEAN, + VERIFY_PVALUE_DISTRIBUTION, + VERIFY_ORDER_INDEPENDENCE + } + + private static LongSupplier asThreadSafeLongSupplier(SplittableRandom rng) { + return () -> { + synchronized (rng) { + return rng.nextLong(); + } + }; + } + + /** + * Tests a multi-stage consistent sampling setup consisting of a consistent probability-based + * sampler with predefined sampling probability followed by a reservoir sampling span processor + * with fixed reservoir size. + */ + private void testConsistentSampling( + long seed, + int numCycles, + int numberOfSpans, + int reservoirSize, + double samplingProbability, + EnumSet tests) { + + SplittableRandom rng1 = new SplittableRandom(seed); + SplittableRandom rng2 = rng1.split(); + + WaitingSpanExporter spanExporter = new WaitingSpanExporter(0); + + SpanProcessor processor = + ConsistentReservoirSamplingSpanProcessor.create( + spanExporter, + reservoirSize, + VERY_LONG_EXPORT_PERIOD_NANOS, + DEFAULT_EXPORT_TIMEOUT_NANOS, + RandomGenerator.create(asThreadSafeLongSupplier(rng1))); + + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .setSampler( + ConsistentSampler.probabilityBased( + samplingProbability, RandomGenerator.create(asThreadSafeLongSupplier(rng2)))) + .addSpanProcessor(processor) + .build(); + + Map observedPvalues = new HashMap<>(); + Map spanNameCounts = new HashMap<>(); + + double[] totalAdjustedCounts = new double[numCycles]; + + for (int k = 0; k < numCycles; ++k) { + List spans = new ArrayList<>(numberOfSpans); + for (long i = 0; i < numberOfSpans; ++i) { + ReadableSpan span = createEndedSpan(Long.toString(i), sdkTracerProvider); + if (span != null) { + spans.add(span); + } + } + + if (samplingProbability >= 1.) { + assertThat(spans).hasSize(numberOfSpans); + } + + processor.forceFlush().join(1000, TimeUnit.SECONDS); + + List exported = spanExporter.getExported(); + assertThat(exported).hasSize(Math.min(reservoirSize, spans.size())); + + long totalAdjustedCount = 0; + for (SpanData spanData : exported) { + String traceStateString = + spanData.getSpanContext().getTraceState().get(OtelTraceState.TRACE_STATE_KEY); + OtelTraceState traceState = OtelTraceState.parse(traceStateString); + assertTrue(traceState.hasValidR()); + assertTrue(traceState.hasValidP()); + observedPvalues.merge(traceState.getP(), 1L, Long::sum); + totalAdjustedCount += 1L << traceState.getP(); + spanNameCounts.merge(spanData.getName(), 1L, Long::sum); + } + totalAdjustedCounts[k] = totalAdjustedCount; + } + + long totalNumberOfSpans = numberOfSpans * (long) numCycles; + if (numCycles == 1) { + assertThat(observedPvalues).hasSizeLessThanOrEqualTo(2); + } + if (tests.contains(Tests.VERIFY_MEAN)) { + assertThat(reservoirSize) + .isGreaterThanOrEqualTo( + 100); // require a lower limit on the reservoir size, to justify the assumption of the + // t-test that values are normally distributed + + assertThat(new TTest().tTest(totalNumberOfSpans / (double) numCycles, totalAdjustedCounts)) + .isGreaterThan(0.01); + } + if (tests.contains(Tests.VERIFY_PVALUE_DISTRIBUTION)) { + assertThat(observedPvalues) + .hasSizeLessThanOrEqualTo(2); // test does not work for more than 2 different p-values + + // The expected number of sampled spans is binomially distributed with the given sampling + // probability. However, due to the reservoir sampling buffer the maximum number of sampled + // spans is given by the reservoir size. The effective sampling rate is therefore given by + // sum_{i=0}^n p^i*(1-p)^{n-i}*min(i,k) (n choose i) + // where p denotes the sampling rate, n is the total number of original spans, and k denotes + // the reservoir size + double p1 = + new BinomialDistribution(numberOfSpans - 1, samplingProbability) + .cumulativeProbability(reservoirSize - 1); + double p2 = + new BinomialDistribution(numberOfSpans, samplingProbability) + .cumulativeProbability(reservoirSize); + assertThat(p1).isLessThanOrEqualTo(p2); + + double effectiveSamplingProbability = + samplingProbability * p1 + (reservoirSize / (double) numberOfSpans) * (1. - p2); + verifyObservedPvaluesUsingGtest( + totalNumberOfSpans, observedPvalues, effectiveSamplingProbability); + } + if (tests.contains(Tests.VERIFY_ORDER_INDEPENDENCE)) { + assertThat(spanNameCounts.size()).isEqualTo(numberOfSpans); + long[] observed = spanNameCounts.values().stream().mapToLong(x -> x).toArray(); + double[] expected = new double[numberOfSpans]; + Arrays.fill(expected, 1.); + assertThat(new GTest().gTest(expected, observed)).isGreaterThan(0.01); + } + + shutdown(sdkTracerProvider); + } + + @Test + @Timeout(1000) + void testConsistentSampling() { + testConsistentSampling( + 0x34e7052af91d5355L, + 1000, + 1000, + 100, + 1., + EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0x44ec62de12a422b4L, + 1000, + 1000, + 100, + 0.8, + EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0x2c3d086534e14407L, + 1000, + 1000, + 100, + 0.1, + EnumSet.of( + Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0xd3f8a40433cf0522L, + 1000, + 1000, + 200, + 0.9, + EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0xf25638ca67eceadcL, 10000, 100, 100, 1.0, EnumSet.of(Tests.VERIFY_MEAN)); + testConsistentSampling( + 0x14c5f8f815618ce2L, + 1000, + 200, + 100, + 1.0, + EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0xb6c27f1169e128ddL, + 1000, + 1000, + 200, + 0.2, + EnumSet.of( + Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0xab558ff7c5c73c18L, + 1000, + 10000, + 200, + 1., + EnumSet.of( + Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0xe53010c4b009a6c0L, + 1000, + 1000, + 2000, + 0.2, + EnumSet.of( + Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE)); + testConsistentSampling( + 0xc41d327fd1a6866aL, 1000000, 5, 4, 1.0, EnumSet.of(Tests.VERIFY_ORDER_INDEPENDENCE)); + } +} diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/RandomGeneratorTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/RandomGeneratorTest.java new file mode 100644 index 000000000..2a8669f6a --- /dev/null +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/RandomGeneratorTest.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.samplers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.util.BitSet; +import java.util.SplittableRandom; +import java.util.stream.DoubleStream; +import org.hipparchus.stat.inference.GTest; +import org.junit.jupiter.api.Test; + +public class RandomGeneratorTest { + + private static void testGenerateRandomBitSet(long seed, int numBits, int numOneBits) { + + int numCycles = 100000; + + SplittableRandom splittableRandom = new SplittableRandom(seed); + RandomGenerator randomGenerator = RandomGenerator.create(splittableRandom::nextLong); + + long[] observed = new long[numBits]; + double[] expected = DoubleStream.generate(() -> 1.).limit(numBits).toArray(); + + for (int i = 0; i < numCycles; ++i) { + BitSet bitSet = randomGenerator.generateRandomBitSet(numBits, numOneBits); + bitSet.stream().forEach(k -> observed[k] += 1); + assertThat(bitSet.cardinality()).isEqualTo(numOneBits); + } + if (numBits > 1) { + assertThat(new GTest().gTest(expected, observed)).isGreaterThan(0.01); + } else if (numBits == 1) { + assertThat(observed[0]).isEqualTo(numOneBits * (long) numCycles); + } else { + fail("numBits was non-positive!"); + } + } + + @Test + void testGenerateRandomBitSet() { + testGenerateRandomBitSet(0x4a5580b958d52182L, 1, 0); + testGenerateRandomBitSet(0x529dff14b0ce7414L, 1, 1); + testGenerateRandomBitSet(0x2d3f673a9e1da536L, 2, 0); + testGenerateRandomBitSet(0xb9a6735e64361bacL, 2, 1); + testGenerateRandomBitSet(0xb5aafedc7031506fL, 2, 2); + testGenerateRandomBitSet(0xaecabe7698971ee1L, 3, 0); + testGenerateRandomBitSet(0x119ccf35dc52b34dL, 3, 1); + testGenerateRandomBitSet(0xcaf2b7a98f194ce2L, 3, 2); + testGenerateRandomBitSet(0xe28e8cc3d3de0c2aL, 3, 3); + testGenerateRandomBitSet(0xb69989dce9cc8b34L, 4, 0); + testGenerateRandomBitSet(0x6575d4c848c95dc8L, 4, 1); + testGenerateRandomBitSet(0xed0ad0525ad632e9L, 4, 2); + testGenerateRandomBitSet(0x34db9303b405a706L, 4, 3); + testGenerateRandomBitSet(0x8e97972893044140L, 4, 4); + testGenerateRandomBitSet(0x47f966b8f28dac77L, 5, 0); + testGenerateRandomBitSet(0x7996db4a5f1e4680L, 5, 1); + testGenerateRandomBitSet(0x577fcf18bbc0ba30L, 5, 2); + testGenerateRandomBitSet(0x36b1ed999d2986b0L, 5, 3); + testGenerateRandomBitSet(0xa8e099ed958d03bbL, 5, 4); + testGenerateRandomBitSet(0xc2b50bbf3263b414L, 5, 5); + testGenerateRandomBitSet(0x2994550582b091e9L, 6, 0); + testGenerateRandomBitSet(0xd2797c539136f6faL, 6, 1); + testGenerateRandomBitSet(0xf3ffae1d93983fd9L, 6, 2); + testGenerateRandomBitSet(0x281e0f9873455ea6L, 6, 3); + testGenerateRandomBitSet(0x5344c2887e30d621L, 6, 4); + testGenerateRandomBitSet(0xa8f4ed6e3e1cf385L, 6, 5); + testGenerateRandomBitSet(0x6bd0f9f11520ae57L, 6, 6); + + testGenerateRandomBitSet(0x514f52732c193e62L, 1000, 1); + testGenerateRandomBitSet(0xe214063ae29d9802L, 1000, 10); + testGenerateRandomBitSet(0x602fdb45063e7b0fL, 1000, 990); + testGenerateRandomBitSet(0xe0ef0cb214de3ec0L, 1000, 999); + } +} diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/util/TestUtil.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/util/TestUtil.java new file mode 100644 index 000000000..af196c1e9 --- /dev/null +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/util/TestUtil.java @@ -0,0 +1,77 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; +import org.hipparchus.stat.inference.GTest; + +public final class TestUtil { + + private TestUtil() {} + + public static void verifyObservedPvaluesUsingGtest( + long originalNumberOfSpans, Map observedPvalues, double samplingProbability) { + + Object notSampled = + new Object() { + @Override + public String toString() { + return "NOT SAMPLED"; + } + }; + + Map expectedProbabilities = new HashMap<>(); + if (samplingProbability >= 1.) { + expectedProbabilities.put(0, 1.); + } else if (samplingProbability <= 0.) { + expectedProbabilities.put(notSampled, 1.); + } else { + int exponent = 0; + while (true) { + if (Math.pow(0.5, exponent + 1) < samplingProbability + && Math.pow(0.5, exponent) >= samplingProbability) { + break; + } + exponent += 1; + } + if (samplingProbability == Math.pow(0.5, exponent)) { + expectedProbabilities.put(notSampled, 1 - samplingProbability); + expectedProbabilities.put(exponent, samplingProbability); + } else { + expectedProbabilities.put(notSampled, 1 - samplingProbability); + expectedProbabilities.put(exponent, 2 * samplingProbability - Math.pow(0.5, exponent)); + expectedProbabilities.put(exponent + 1, Math.pow(0.5, exponent) - samplingProbability); + } + } + + Map extendedObservedAdjustedCounts = new HashMap<>(observedPvalues); + long numberOfSpansNotSampled = + originalNumberOfSpans - observedPvalues.values().stream().mapToLong(i -> i).sum(); + if (numberOfSpansNotSampled > 0) { + extendedObservedAdjustedCounts.put(notSampled, numberOfSpansNotSampled); + } + + double[] expectedValues = new double[expectedProbabilities.size()]; + long[] observedValues = new long[expectedProbabilities.size()]; + + int counter = 0; + for (Object key : expectedProbabilities.keySet()) { + observedValues[counter] = extendedObservedAdjustedCounts.getOrDefault(key, 0L); + double p = expectedProbabilities.get(key); + expectedValues[counter] = p * originalNumberOfSpans; + counter += 1; + } + + if (expectedProbabilities.size() > 1) { + assertThat(new GTest().gTest(expectedValues, observedValues)).isGreaterThan(0.01); + } else { + assertThat((double) observedValues[0]).isEqualTo(expectedValues[0]); + } + } +} From e6d726c19e88243998b8396c7357ebf5614766b9 Mon Sep 17 00:00:00 2001 From: Otmar Ertl Date: Wed, 15 Jun 2022 08:59:18 +0200 Subject: [PATCH 3/3] made constant package private --- .../samplers/ConsistentReservoirSamplingSpanProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java index bcda18e62..cb2ae54dc 100644 --- a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java @@ -45,7 +45,8 @@ public final class ConsistentReservoirSamplingSpanProcessor implements SpanProce private final Worker worker; private final AtomicBoolean isShutdown = new AtomicBoolean(false); - public static final long DEFAULT_EXPORT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); + // visible for testing + static final long DEFAULT_EXPORT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); private static final class ReadableSpanWithPriority {