Skip to content

Commit

Permalink
Stdout exporter for span and metrics (#6750)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeitlinger authored Oct 9, 2024
1 parent eb53fe3 commit b927d9d
Show file tree
Hide file tree
Showing 35 changed files with 1,704 additions and 720 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregationUtil;
import java.net.URI;
Expand Down Expand Up @@ -96,5 +98,95 @@ public static void configureHistogramDefaultAggregation(
}
}

/**
* Invoke the {@code aggregationTemporalitySelectorConsumer} with the configured {@link
* AggregationTemporality}.
*/
public static void configureOtlpAggregationTemporality(
ConfigProperties config,
Consumer<AggregationTemporalitySelector> aggregationTemporalitySelectorConsumer) {
String temporalityStr = config.getString("otel.exporter.otlp.metrics.temporality.preference");
if (temporalityStr == null) {
return;
}
AggregationTemporalitySelector temporalitySelector;
switch (temporalityStr.toLowerCase(Locale.ROOT)) {
case "cumulative":
temporalitySelector = AggregationTemporalitySelector.alwaysCumulative();
break;
case "delta":
temporalitySelector = AggregationTemporalitySelector.deltaPreferred();
break;
case "lowmemory":
temporalitySelector = AggregationTemporalitySelector.lowMemory();
break;
default:
throw new ConfigurationException("Unrecognized aggregation temporality: " + temporalityStr);
}
aggregationTemporalitySelectorConsumer.accept(temporalitySelector);
}

public static void configureOtlpAggregationTemporality(
StructuredConfigProperties config,
Consumer<AggregationTemporalitySelector> aggregationTemporalitySelectorConsumer) {
String temporalityStr = config.getString("temporality_preference");
if (temporalityStr == null) {
return;
}
AggregationTemporalitySelector temporalitySelector;
switch (temporalityStr.toLowerCase(Locale.ROOT)) {
case "cumulative":
temporalitySelector = AggregationTemporalitySelector.alwaysCumulative();
break;
case "delta":
temporalitySelector = AggregationTemporalitySelector.deltaPreferred();
break;
case "lowmemory":
temporalitySelector = AggregationTemporalitySelector.lowMemory();
break;
default:
throw new ConfigurationException("Unrecognized temporality_preference: " + temporalityStr);
}
aggregationTemporalitySelectorConsumer.accept(temporalitySelector);
}

/**
* Invoke the {@code defaultAggregationSelectorConsumer} with the configured {@link
* DefaultAggregationSelector}.
*/
public static void configureOtlpHistogramDefaultAggregation(
ConfigProperties config,
Consumer<DefaultAggregationSelector> defaultAggregationSelectorConsumer) {
String defaultHistogramAggregation =
config.getString("otel.exporter.otlp.metrics.default.histogram.aggregation");
if (defaultHistogramAggregation != null) {
configureHistogramDefaultAggregation(
defaultHistogramAggregation, defaultAggregationSelectorConsumer);
}
}

/**
* Invoke the {@code defaultAggregationSelectorConsumer} with the configured {@link
* DefaultAggregationSelector}.
*/
public static void configureOtlpHistogramDefaultAggregation(
StructuredConfigProperties config,
Consumer<DefaultAggregationSelector> defaultAggregationSelectorConsumer) {
String defaultHistogramAggregation = config.getString("default_histogram_aggregation");
if (defaultHistogramAggregation == null) {
return;
}
if (AggregationUtil.aggregationName(Aggregation.base2ExponentialBucketHistogram())
.equalsIgnoreCase(defaultHistogramAggregation)) {
defaultAggregationSelectorConsumer.accept(
DefaultAggregationSelector.getDefault()
.with(InstrumentType.HISTOGRAM, Aggregation.base2ExponentialBucketHistogram()));
} else if (!AggregationUtil.aggregationName(explicitBucketHistogram())
.equalsIgnoreCase(defaultHistogramAggregation)) {
throw new ConfigurationException(
"Unrecognized default_histogram_aggregation: " + defaultHistogramAggregation);
}
}

private ExporterBuilderUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,14 @@

package io.opentelemetry.exporter.logging.otlp;

import static io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil.JSON_FACTORY;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil;
import io.opentelemetry.exporter.logging.otlp.internal.metrics.OtlpStdoutMetricExporter;
import io.opentelemetry.exporter.logging.otlp.internal.metrics.OtlpStdoutMetricExporterBuilder;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -31,30 +24,49 @@ public final class OtlpJsonLoggingMetricExporter implements MetricExporter {
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName());

private final AtomicBoolean isShutdown = new AtomicBoolean();

private final AggregationTemporality aggregationTemporality;

private final OtlpStdoutMetricExporter delegate;

/**
* Returns a new {@link OtlpJsonLoggingMetricExporter} with a aggregation temporality of {@link
* AggregationTemporality#CUMULATIVE}.
*/
public static MetricExporter create() {
return new OtlpJsonLoggingMetricExporter(AggregationTemporality.CUMULATIVE);
return create(AggregationTemporality.CUMULATIVE);
}

/**
* Returns a new {@link OtlpJsonLoggingMetricExporter} with the given {@code
* aggregationTemporality}.
*/
public static MetricExporter create(AggregationTemporality aggregationTemporality) {
return new OtlpJsonLoggingMetricExporter(aggregationTemporality);
OtlpStdoutMetricExporter delegate =
new OtlpStdoutMetricExporterBuilder(logger).setWrapperJsonObject(false).build();
return new OtlpJsonLoggingMetricExporter(delegate, aggregationTemporality);
}

private OtlpJsonLoggingMetricExporter(AggregationTemporality aggregationTemporality) {
OtlpJsonLoggingMetricExporter(
OtlpStdoutMetricExporter delegate, AggregationTemporality aggregationTemporality) {
this.delegate = delegate;
this.aggregationTemporality = aggregationTemporality;
}

@Override
public CompletableResultCode export(Collection<MetricData> logs) {
return delegate.export(logs);
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}

/**
* Return the aggregation temporality.
*
Expand All @@ -69,41 +81,4 @@ public AggregationTemporality getPreferredTemporality() {
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return aggregationTemporality;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

ResourceMetricsMarshaler[] allResourceMetrics = ResourceMetricsMarshaler.create(metrics);
for (ResourceMetricsMarshaler resourceMetrics : allResourceMetrics) {
SegmentedStringWriter sw = new SegmentedStringWriter(JSON_FACTORY._getBufferRecycler());
try (JsonGenerator gen = JsonUtil.create(sw)) {
resourceMetrics.writeJsonTo(gen);
} catch (IOException e) {
// Shouldn't happen in practice, just skip it.
continue;
}
try {
logger.log(Level.INFO, sw.getAndClear());
} catch (IOException e) {
logger.log(Level.WARNING, "Unable to read OTLP JSON metrics", e);
}
}
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@

package io.opentelemetry.exporter.logging.otlp;

import static io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil.JSON_FACTORY;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil;
import io.opentelemetry.exporter.logging.otlp.internal.traces.OtlpStdoutSpanExporter;
import io.opentelemetry.exporter.logging.otlp.internal.traces.OtlpStdoutSpanExporterBuilder;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -29,49 +22,31 @@ public final class OtlpJsonLoggingSpanExporter implements SpanExporter {
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingSpanExporter.class.getName());

private final AtomicBoolean isShutdown = new AtomicBoolean();
private final OtlpStdoutSpanExporter delegate;

/** Returns a new {@link OtlpJsonLoggingSpanExporter}. */
public static SpanExporter create() {
return new OtlpJsonLoggingSpanExporter();
OtlpStdoutSpanExporter delegate =
new OtlpStdoutSpanExporterBuilder(logger).setWrapperJsonObject(false).build();
return new OtlpJsonLoggingSpanExporter(delegate);
}

private OtlpJsonLoggingSpanExporter() {}
OtlpJsonLoggingSpanExporter(OtlpStdoutSpanExporter delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

ResourceSpansMarshaler[] allResourceSpans = ResourceSpansMarshaler.create(spans);
for (ResourceSpansMarshaler resourceSpans : allResourceSpans) {
SegmentedStringWriter sw = new SegmentedStringWriter(JSON_FACTORY._getBufferRecycler());
try (JsonGenerator gen = JsonUtil.create(sw)) {
resourceSpans.writeJsonTo(gen);
} catch (IOException e) {
// Shouldn't happen in practice, just skip it.
continue;
}
try {
logger.log(Level.INFO, sw.getAndClear());
} catch (IOException e) {
logger.log(Level.WARNING, "Unable to read OTLP JSON spans", e);
}
}
return CompletableResultCode.ofSuccess();
public CompletableResultCode export(Collection<SpanData> logs) {
return delegate.export(logs);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class LoggingLogRecordExporterProvider implements ConfigurableLogRecordExporterProvider {
public final class LoggingLogRecordExporterProvider
implements ConfigurableLogRecordExporterProvider {

@Override
public LogRecordExporter createExporter(ConfigProperties config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class OtlpStdoutLogRecordExporter implements LogRecordExporter {
public final class OtlpStdoutLogRecordExporter implements LogRecordExporter {

private static final Logger LOGGER =
Logger.getLogger(OtlpStdoutLogRecordExporter.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class OtlpStdoutLogRecordExporterComponentProvider
public final class OtlpStdoutLogRecordExporterComponentProvider
implements ComponentProvider<LogRecordExporter> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class OtlpStdoutLogRecordExporterProvider implements ConfigurableLogRecordExporterProvider {
public final class OtlpStdoutLogRecordExporterProvider
implements ConfigurableLogRecordExporterProvider {
@Override
public LogRecordExporter createExporter(ConfigProperties config) {
OtlpStdoutLogRecordExporterBuilder builder = OtlpStdoutLogRecordExporter.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.logging.otlp.internal;
package io.opentelemetry.exporter.logging.otlp.internal.metrics;

import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
Expand All @@ -16,7 +16,8 @@
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class LoggingMetricExporterProvider implements ConfigurableMetricExporterProvider {
public final class LoggingMetricExporterProvider implements ConfigurableMetricExporterProvider {

@Override
public MetricExporter createExporter(ConfigProperties config) {
return OtlpJsonLoggingMetricExporter.create();
Expand Down
Loading

0 comments on commit b927d9d

Please sign in to comment.