Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[telemetry] remove dev mode; include dev metrics by default #157

Merged
merged 3 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ String tag() {
public static final int SOCKET_BUFFER_BYTES = -1;
public static final boolean DEFAULT_BLOCKING = false;
public static final boolean DEFAULT_ENABLE_TELEMETRY = true;
public static final boolean DEFAULT_ENABLE_DEVMODE = false;
public static final boolean DEFAULT_ENABLE_AGGREGATION = true;

public static final boolean DEFAULT_ENABLE_AGGREGATION = false;

public static final String CLIENT_TAG = "client:java";
public static final String CLIENT_VERSION_TAG = "client_version:";
Expand Down Expand Up @@ -211,8 +211,6 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
* Boolean to enable client telemetry.
* @param telemetryFlushInterval
* Telemetry flush interval integer, in milliseconds.
* @param enableDevMode
* Boolean to enable client telemetry in dev-mode.
* @param aggregationFlushInterval
* Aggregation flush interval integer, in milliseconds. 0 disables aggregation.
* @param aggregationShards
Expand All @@ -225,12 +223,12 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, final S
final Callable<SocketAddress> telemetryAddressLookup, final int timeout, final int bufferSize,
final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers,
final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval,
final boolean enableDevMode, final int aggregationFlushInterval, final int aggregationShards)
final int aggregationFlushInterval, final int aggregationShards)
throws StatsDClientException {

this(prefix, queueSize, constantTags, errorHandler, addressLookup, telemetryAddressLookup, timeout,
bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking,
enableTelemetry, telemetryFlushInterval, enableDevMode, aggregationFlushInterval, aggregationShards,
enableTelemetry, telemetryFlushInterval, aggregationFlushInterval, aggregationShards,
null);
}

Expand All @@ -242,8 +240,8 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
final Callable<SocketAddress> telemetryAddressLookup, final int timeout, final int bufferSize,
final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers,
final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval,
final boolean enableDevMode, final int aggregationFlushInterval, final int aggregationShards,
final ThreadFactory customThreadFactory) throws StatsDClientException {
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory)
throws StatsDClientException {

if ((prefix != null) && (!prefix.isEmpty())) {
this.prefix = prefix + ".";
Expand Down Expand Up @@ -341,7 +339,6 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
this.telemetry = new Telemetry.Builder()
.tags(telemetryTags)
.processor(telemetryStatsDProcessor)
.devMode(enableDevMode)
.build();

statsDSender = createSender(addressLookup, handler, clientChannel, statsDProcessor.getBufferPool(),
Expand Down Expand Up @@ -392,7 +389,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
builder.socketBufferSize, builder.maxPacketSizeBytes, builder.entityID,
builder.bufferPoolSize, builder.processorWorkers, builder.senderWorkers,
builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval,
builder.enableDevMode, (builder.enableAggregation ? builder.aggregationFlushInterval : 0),
(builder.enableAggregation ? builder.aggregationFlushInterval : 0),
builder.aggregationShards, builder.threadFactory);
}

Expand Down Expand Up @@ -1008,7 +1005,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[

this(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout,
bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers,
blocking, enableTelemetry, telemetryFlushInterval, DEFAULT_ENABLE_DEVMODE, 0, 0);
blocking, enableTelemetry, telemetryFlushInterval, 0, 0);
}

protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS;
public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING;
public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY;
public boolean enableDevMode = NonBlockingStatsDClient.DEFAULT_ENABLE_DEVMODE;
public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION;
public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL;
public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL;
Expand Down Expand Up @@ -143,11 +142,6 @@ public NonBlockingStatsDClientBuilder enableTelemetry(boolean val) {
return this;
}

public NonBlockingStatsDClientBuilder enableDevMode(boolean val) {
enableDevMode = val;
return this;
}

public NonBlockingStatsDClientBuilder enableAggregation(boolean val) {
enableAggregation = val;
return this;
Expand Down
28 changes: 13 additions & 15 deletions src/main/java/com/timgroup/statsd/StatsDAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,19 @@ public boolean aggregateMessage(Message message) {
if (telemetry != null) {
telemetry.incrAggregatedContexts(1);

if (telemetry.getDevMode()) {

switch (message.getType()) {
case GAUGE:
telemetry.incrAggregatedGaugeContexts(1);
break;
case COUNT:
telemetry.incrAggregatedCountContexts(1);
break;
case SET:
telemetry.incrAggregatedSetContexts(1);
break;
default:
break;
}
// developer metrics
switch (message.getType()) {
case GAUGE:
telemetry.incrAggregatedGaugeContexts(1);
break;
case COUNT:
telemetry.incrAggregatedCountContexts(1);
break;
case SET:
telemetry.incrAggregatedSetContexts(1);
break;
default:
break;
}
}
}
Expand Down
96 changes: 37 additions & 59 deletions src/main/java/com/timgroup/statsd/Telemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public class Telemetry {
protected final String aggregatedContextsByTypeMetric = "datadog.dogstatsd.client.aggregated_context_by_type";

protected String tags;
protected boolean devMode;
protected StringBuilder devModeBuilder = new StringBuilder();
protected StringBuilder tagBuilder = new StringBuilder();

public StatsDProcessor processor;
protected Timer timer;
Expand Down Expand Up @@ -81,18 +80,16 @@ public final void writeTo(StringBuilder builder) {
}
}

Telemetry(final String tags, final StatsDProcessor processor, final boolean devMode) {
Telemetry(final String tags, final StatsDProcessor processor) {
// precompute metrics lines with tags
this.tags = tags;
this.processor = processor;
this.devMode = devMode;
this.timer = null;
}

public static class Builder {
private String tags;
private StatsDProcessor processor;
private boolean devMode;

public Builder() {}

Expand All @@ -106,13 +103,8 @@ public Builder processor(StatsDProcessor processor) {
return this;
}

public Builder devMode(boolean devMode) {
this.devMode = devMode;
return this;
}

public Telemetry build() {
return new Telemetry(this.tags, this.processor, this.devMode);
return new Telemetry(this.tags, this.processor);
}
}

Expand Down Expand Up @@ -155,55 +147,51 @@ public void flush() {
processor.send(new TelemetryMessage(this.packetsDroppedQueueMetric, this.packetsDroppedQueue.getAndSet(0), tags));
processor.send(new TelemetryMessage(this.aggregatedContextsMetric, this.aggregatedContexts.getAndSet(0), tags));

if (devMode) {
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.gaugeSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.GAUGE)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.countSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.COUNT)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.setSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.SET)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.histogramSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.HISTOGRAM)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.distributionSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.DISTRIBUTION)));

processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedGaugeContexts.getAndSet(0),
getTelemetryTags(tags, Message.Type.GAUGE)));
processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedCountContexts.getAndSet(0),
getTelemetryTags(tags, Message.Type.COUNT)));
processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedSetContexts.getAndSet(0),
getTelemetryTags(tags, Message.Type.SET)));
}
// developer metrics
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.gaugeSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.GAUGE)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.countSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.COUNT)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.setSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.SET)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.histogramSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.HISTOGRAM)));
processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.distributionSent.getAndSet(0),
getTelemetryTags(tags, Message.Type.DISTRIBUTION)));

processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedGaugeContexts.getAndSet(0),
getTelemetryTags(tags, Message.Type.GAUGE)));
processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedCountContexts.getAndSet(0),
getTelemetryTags(tags, Message.Type.COUNT)));
processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedSetContexts.getAndSet(0),
getTelemetryTags(tags, Message.Type.SET)));
}

protected String getTelemetryTags(String tags, Message.Type type) {
if (!devMode) {
return tags;
}

devModeBuilder.setLength(0);
devModeBuilder.append(tags);
tagBuilder.setLength(0);
tagBuilder.append(tags);
switch (type) {
case GAUGE:
devModeBuilder.append(",metrics_type:gauge");
tagBuilder.append(",metrics_type:gauge");
break;
case COUNT:
devModeBuilder.append(",metrics_type:count");
tagBuilder.append(",metrics_type:count");
break;
case SET:
devModeBuilder.append(",metrics_type:set");
tagBuilder.append(",metrics_type:set");
break;
case HISTOGRAM:
devModeBuilder.append(",metrics_type:histogram");
tagBuilder.append(",metrics_type:histogram");
break;
case DISTRIBUTION:
devModeBuilder.append(",metrics_type:distribution");
tagBuilder.append(",metrics_type:distribution");
break;
default:
break;
}

return devModeBuilder.toString();
return tagBuilder.toString();
}

/**
Expand Down Expand Up @@ -327,17 +315,15 @@ public void reset() {
this.packetsDroppedQueue.set(0);
this.aggregatedContexts.set(0);

if (devMode) {
this.gaugeSent.set(0);
this.countSent.set(0);
this.histogramSent.set(0);
this.distributionSent.set(0);
this.setSent.set(0);
this.gaugeSent.set(0);
this.countSent.set(0);
this.histogramSent.set(0);
this.distributionSent.set(0);
this.setSent.set(0);

this.aggregatedGaugeContexts.set(0);
this.aggregatedCountContexts.set(0);
this.aggregatedSetContexts.set(0);
}
this.aggregatedGaugeContexts.set(0);
this.aggregatedCountContexts.set(0);
this.aggregatedSetContexts.set(0);
}

/**
Expand All @@ -347,12 +333,4 @@ public void reset() {
public String getTags() {
return this.tags;
}

/**
* Gets the dev mode setting value.
* @return this Telemetry instance dev mode setting.
*/
public boolean getDevMode() {
return this.devMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ public void sends_telemetry_elsewhere() throws Exception {

// 8 messages in telemetry batch
final List<String> messages = telemetryServer.messagesReceived();
assertEquals(9, messages.size());
assertEquals(17, messages.size());
assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.metrics:1|c")));
assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.events:0|c")));
assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.service_checks:0|c")));
Expand Down Expand Up @@ -1231,7 +1231,7 @@ public void testAggregationTelemetry() throws Exception {

List<String> messages = server.messagesReceived();

assertThat(messages.size(), comparesEqualTo(3+9));
assertThat(messages.size(), comparesEqualTo(3+17));
assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.aggregated_context:27|c")));

} finally {
Expand Down Expand Up @@ -1339,7 +1339,7 @@ private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsD
final int senderWorkers, boolean blocking) throws StatsDClientException {

super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout,bufferSize,
maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking, false, 0, false, 0, 0);
maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking, false, 0, 0, 0);
lock = new CountDownLatch(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ public static void start() throws Exception {
// set telemetry
Telemetry telemetry = new Telemetry.Builder()
.processor(fakeProcessor)
.devMode(true)
.build();
fakeProcessor.setTelemetry(telemetry);

Expand Down
Loading