From 9f6050ddaea49029d042ce527c5f771af19fd82e Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Tue, 14 Sep 2021 09:55:07 -0500 Subject: [PATCH 1/2] [telemetry] remove dev mode; include dev metrics by default --- .../statsd/NonBlockingStatsDClient.java | 16 +- .../NonBlockingStatsDClientBuilder.java | 6 - .../com/timgroup/statsd/StatsDAggregator.java | 28 ++- .../java/com/timgroup/statsd/Telemetry.java | 96 +++----- .../timgroup/statsd/StatsDAggregatorTest.java | 1 - .../com/timgroup/statsd/TelemetryTest.java | 212 +++++++++--------- 6 files changed, 161 insertions(+), 198 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index dbaf8c55..9c914c41 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -89,7 +89,6 @@ String tag() { public static final int SOCKET_BUFFER_BYTES = -1; public static final boolean DEFAULT_BLOCKING = false; public static final boolean DEFAULT_ENABLE_TELEMETRY = true; - public static final boolean DEFAULT_ENABLE_DEVMODE = false; public static final boolean DEFAULT_ENABLE_AGGREGATION = false; public static final String CLIENT_TAG = "client:java"; @@ -211,8 +210,6 @@ private static String format(ThreadLocal formatter, Number value) * Boolean to enable client telemetry. * @param telemetryFlushInterval * Telemetry flush interval integer, in milliseconds. - * @param enableDevMode - * Boolean to enable client telemetry in dev-mode. * @param aggregationFlushInterval * Aggregation flush interval integer, in milliseconds. 0 disables aggregation. * @param aggregationShards @@ -225,12 +222,12 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, final S final Callable telemetryAddressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval, - final boolean enableDevMode, final int aggregationFlushInterval, final int aggregationShards) + final int aggregationFlushInterval, final int aggregationShards) throws StatsDClientException { this(prefix, queueSize, constantTags, errorHandler, addressLookup, telemetryAddressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking, - enableTelemetry, telemetryFlushInterval, enableDevMode, aggregationFlushInterval, aggregationShards, + enableTelemetry, telemetryFlushInterval, aggregationFlushInterval, aggregationShards, null); } @@ -242,8 +239,8 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final final Callable telemetryAddressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval, - final boolean enableDevMode, final int aggregationFlushInterval, final int aggregationShards, - final ThreadFactory customThreadFactory) throws StatsDClientException { + final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory) + throws StatsDClientException { if ((prefix != null) && (!prefix.isEmpty())) { this.prefix = prefix + "."; @@ -341,7 +338,6 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final this.telemetry = new Telemetry.Builder() .tags(telemetryTags) .processor(telemetryStatsDProcessor) - .devMode(enableDevMode) .build(); statsDSender = createSender(addressLookup, handler, clientChannel, statsDProcessor.getBufferPool(), @@ -392,7 +388,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final builder.socketBufferSize, builder.maxPacketSizeBytes, builder.entityID, builder.bufferPoolSize, builder.processorWorkers, builder.senderWorkers, builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval, - builder.enableDevMode, (builder.enableAggregation ? builder.aggregationFlushInterval : 0), + (builder.enableAggregation ? builder.aggregationFlushInterval : 0), builder.aggregationShards, builder.threadFactory); } @@ -1008,7 +1004,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ this(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, - blocking, enableTelemetry, telemetryFlushInterval, DEFAULT_ENABLE_DEVMODE, 0, 0); + blocking, enableTelemetry, telemetryFlushInterval, 0, 0); } protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 8f09384d..f0d75e86 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -28,7 +28,6 @@ public class NonBlockingStatsDClientBuilder implements Cloneable { public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS; public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING; public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY; - public boolean enableDevMode = NonBlockingStatsDClient.DEFAULT_ENABLE_DEVMODE; public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION; public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL; public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL; @@ -143,11 +142,6 @@ public NonBlockingStatsDClientBuilder enableTelemetry(boolean val) { return this; } - public NonBlockingStatsDClientBuilder enableDevMode(boolean val) { - enableDevMode = val; - return this; - } - public NonBlockingStatsDClientBuilder enableAggregation(boolean val) { enableAggregation = val; return this; diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index 93ec49e3..fa52bb2f 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -116,21 +116,19 @@ public boolean aggregateMessage(Message message) { if (telemetry != null) { telemetry.incrAggregatedContexts(1); - if (telemetry.getDevMode()) { - - switch (message.getType()) { - case GAUGE: - telemetry.incrAggregatedGaugeContexts(1); - break; - case COUNT: - telemetry.incrAggregatedCountContexts(1); - break; - case SET: - telemetry.incrAggregatedSetContexts(1); - break; - default: - break; - } + // developer metrics + switch (message.getType()) { + case GAUGE: + telemetry.incrAggregatedGaugeContexts(1); + break; + case COUNT: + telemetry.incrAggregatedCountContexts(1); + break; + case SET: + telemetry.incrAggregatedSetContexts(1); + break; + default: + break; } } } diff --git a/src/main/java/com/timgroup/statsd/Telemetry.java b/src/main/java/com/timgroup/statsd/Telemetry.java index 52994802..1c6b58db 100644 --- a/src/main/java/com/timgroup/statsd/Telemetry.java +++ b/src/main/java/com/timgroup/statsd/Telemetry.java @@ -41,8 +41,7 @@ public class Telemetry { protected final String aggregatedContextsByTypeMetric = "datadog.dogstatsd.client.aggregated_context_by_type"; protected String tags; - protected boolean devMode; - protected StringBuilder devModeBuilder = new StringBuilder(); + protected StringBuilder tagBuilder = new StringBuilder(); public StatsDProcessor processor; protected Timer timer; @@ -81,18 +80,16 @@ public final void writeTo(StringBuilder builder) { } } - Telemetry(final String tags, final StatsDProcessor processor, final boolean devMode) { + Telemetry(final String tags, final StatsDProcessor processor) { // precompute metrics lines with tags this.tags = tags; this.processor = processor; - this.devMode = devMode; this.timer = null; } public static class Builder { private String tags; private StatsDProcessor processor; - private boolean devMode; public Builder() {} @@ -106,13 +103,8 @@ public Builder processor(StatsDProcessor processor) { return this; } - public Builder devMode(boolean devMode) { - this.devMode = devMode; - return this; - } - public Telemetry build() { - return new Telemetry(this.tags, this.processor, this.devMode); + return new Telemetry(this.tags, this.processor); } } @@ -155,55 +147,51 @@ public void flush() { processor.send(new TelemetryMessage(this.packetsDroppedQueueMetric, this.packetsDroppedQueue.getAndSet(0), tags)); processor.send(new TelemetryMessage(this.aggregatedContextsMetric, this.aggregatedContexts.getAndSet(0), tags)); - if (devMode) { - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.gaugeSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.GAUGE))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.countSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.COUNT))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.setSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.SET))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.histogramSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.HISTOGRAM))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.distributionSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.DISTRIBUTION))); - - processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedGaugeContexts.getAndSet(0), - getTelemetryTags(tags, Message.Type.GAUGE))); - processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedCountContexts.getAndSet(0), - getTelemetryTags(tags, Message.Type.COUNT))); - processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedSetContexts.getAndSet(0), - getTelemetryTags(tags, Message.Type.SET))); - } + // developer metrics + processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.gaugeSent.getAndSet(0), + getTelemetryTags(tags, Message.Type.GAUGE))); + processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.countSent.getAndSet(0), + getTelemetryTags(tags, Message.Type.COUNT))); + processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.setSent.getAndSet(0), + getTelemetryTags(tags, Message.Type.SET))); + processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.histogramSent.getAndSet(0), + getTelemetryTags(tags, Message.Type.HISTOGRAM))); + processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.distributionSent.getAndSet(0), + getTelemetryTags(tags, Message.Type.DISTRIBUTION))); + + processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedGaugeContexts.getAndSet(0), + getTelemetryTags(tags, Message.Type.GAUGE))); + processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedCountContexts.getAndSet(0), + getTelemetryTags(tags, Message.Type.COUNT))); + processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedSetContexts.getAndSet(0), + getTelemetryTags(tags, Message.Type.SET))); } protected String getTelemetryTags(String tags, Message.Type type) { - if (!devMode) { - return tags; - } - devModeBuilder.setLength(0); - devModeBuilder.append(tags); + tagBuilder.setLength(0); + tagBuilder.append(tags); switch (type) { case GAUGE: - devModeBuilder.append(",metrics_type:gauge"); + tagBuilder.append(",metrics_type:gauge"); break; case COUNT: - devModeBuilder.append(",metrics_type:count"); + tagBuilder.append(",metrics_type:count"); break; case SET: - devModeBuilder.append(",metrics_type:set"); + tagBuilder.append(",metrics_type:set"); break; case HISTOGRAM: - devModeBuilder.append(",metrics_type:histogram"); + tagBuilder.append(",metrics_type:histogram"); break; case DISTRIBUTION: - devModeBuilder.append(",metrics_type:distribution"); + tagBuilder.append(",metrics_type:distribution"); break; default: break; } - return devModeBuilder.toString(); + return tagBuilder.toString(); } /** @@ -327,17 +315,15 @@ public void reset() { this.packetsDroppedQueue.set(0); this.aggregatedContexts.set(0); - if (devMode) { - this.gaugeSent.set(0); - this.countSent.set(0); - this.histogramSent.set(0); - this.distributionSent.set(0); - this.setSent.set(0); + this.gaugeSent.set(0); + this.countSent.set(0); + this.histogramSent.set(0); + this.distributionSent.set(0); + this.setSent.set(0); - this.aggregatedGaugeContexts.set(0); - this.aggregatedCountContexts.set(0); - this.aggregatedSetContexts.set(0); - } + this.aggregatedGaugeContexts.set(0); + this.aggregatedCountContexts.set(0); + this.aggregatedSetContexts.set(0); } /** @@ -347,12 +333,4 @@ public void reset() { public String getTags() { return this.tags; } - - /** - * Gets the dev mode setting value. - * @return this Telemetry instance dev mode setting. - */ - public boolean getDevMode() { - return this.devMode; - } } diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index 57afc357..162d59fa 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -147,7 +147,6 @@ public static void start() throws Exception { // set telemetry Telemetry telemetry = new Telemetry.Builder() .processor(fakeProcessor) - .devMode(true) .build(); fakeProcessor.setTelemetry(telemetry); diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index f75b1bd0..45bd8fd2 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -80,11 +80,11 @@ public StatsDNonBlockingTelemetry(final String prefix, final int queueSize, Stri final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, final int senderWorkers, boolean blocking, final boolean enableTelemetry, - final int telemetryFlushInterval, final boolean enableDevMode) + final int telemetryFlushInterval) throws StatsDClientException { super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, - blocking, enableTelemetry, telemetryFlushInterval, enableDevMode, 0, 0); + blocking, enableTelemetry, telemetryFlushInterval, 0, 0); } }; @@ -103,12 +103,12 @@ public StatsDNonBlockingTelemetry build() throws StatsDClientException { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, addressLookup, timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval, enableDevMode); + telemetryFlushInterval); } else { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval, enableDevMode); + telemetryFlushInterval); } } } @@ -122,15 +122,14 @@ public StatsDNonBlockingTelemetry build() throws StatsDClientException { .enableTelemetry(false); // disable telemetry so we can control calls to "flush" private static StatsDNonBlockingTelemetry client = ((StatsDNonBlockingTelemetryBuilder)builder).build(); - // dev-mode client - private static final NonBlockingStatsDClientBuilder devModeBuilder = new StatsDNonBlockingTelemetryBuilder() + // telemetry client + private static final NonBlockingStatsDClientBuilder telemetryBuilder = new StatsDNonBlockingTelemetryBuilder() .prefix("my.prefix") .hostname("localhost") .constantTags("test") .port(STATSD_SERVER_PORT) - .enableTelemetry(false) // disable telemetry so we can control calls to "flush" - .enableDevMode(true); - private static StatsDNonBlockingTelemetry devModeClient = ((StatsDNonBlockingTelemetryBuilder)devModeBuilder).build(); + .enableTelemetry(false); // disable telemetry so we can control calls to "flush" + private static StatsDNonBlockingTelemetry telemetryClient = ((StatsDNonBlockingTelemetryBuilder)telemetryBuilder).build(); // builderError fails to send any data on the network, producing packets dropped private static final NonBlockingStatsDClientBuilder builderError = new StatsDNonBlockingTelemetryBuilder() @@ -158,7 +157,7 @@ public static void start() throws IOException, Exception { server = new DummyStatsDServer(STATSD_SERVER_PORT); fakeProcessor = new FakeProcessor(NO_OP_HANDLER); client.telemetry.processor = fakeProcessor; - devModeClient.telemetry.processor = fakeProcessor; + telemetryClient.telemetry.processor = fakeProcessor; telemetryTags = computeTelemetryTags(); } @@ -179,74 +178,73 @@ public void clear() { server.clear(); client.telemetry.reset(); clientError.telemetry.reset(); - devModeClient.telemetry.reset(); + telemetryClient.telemetry.reset(); fakeProcessor.clear(); } @Test(timeout = 5000L) public void telemetry_incrManuallyIncrData() throws Exception { - devModeClient.telemetry.incrMetricsSent(1); - devModeClient.telemetry.incrGaugeSent(1); - devModeClient.telemetry.incrCountSent(1); - devModeClient.telemetry.incrSetSent(1); - devModeClient.telemetry.incrHistogramSent(1); - devModeClient.telemetry.incrDistributionSent(1); - devModeClient.telemetry.incrMetricsSent(1, Message.Type.GAUGE); // adds to metricsSent - devModeClient.telemetry.incrMetricsSent(1, Message.Type.COUNT); // adds to metricsSent - devModeClient.telemetry.incrMetricsSent(1, Message.Type.SET); // adds to metricsSent - devModeClient.telemetry.incrMetricsSent(1, Message.Type.HISTOGRAM); // adds to metricsSent - devModeClient.telemetry.incrMetricsSent(1, Message.Type.DISTRIBUTION); // adds to metricsSent - devModeClient.telemetry.incrEventsSent(2); - devModeClient.telemetry.incrServiceChecksSent(3); - devModeClient.telemetry.incrBytesSent(4); - devModeClient.telemetry.incrBytesDropped(5); - devModeClient.telemetry.incrPacketSent(6); - devModeClient.telemetry.incrPacketDropped(7); - devModeClient.telemetry.incrPacketDroppedQueue(8); - devModeClient.telemetry.incrAggregatedContexts(9); - devModeClient.telemetry.incrAggregatedGaugeContexts(10); - devModeClient.telemetry.incrAggregatedCountContexts(11); - devModeClient.telemetry.incrAggregatedSetContexts(12); - - assertThat(devModeClient.telemetry.getDevMode(), equalTo(true)); - assertThat(devModeClient.telemetry.metricsSent.get(), equalTo(6)); - assertThat(devModeClient.telemetry.gaugeSent.get(), equalTo(2)); - assertThat(devModeClient.telemetry.countSent.get(), equalTo(2)); - assertThat(devModeClient.telemetry.setSent.get(), equalTo(2)); - assertThat(devModeClient.telemetry.histogramSent.get(), equalTo(2)); - assertThat(devModeClient.telemetry.distributionSent.get(), equalTo(2)); - assertThat(devModeClient.telemetry.eventsSent.get(), equalTo(2)); - assertThat(devModeClient.telemetry.serviceChecksSent.get(), equalTo(3)); - assertThat(devModeClient.telemetry.bytesSent.get(), equalTo(4)); - assertThat(devModeClient.telemetry.bytesDropped.get(), equalTo(5)); - assertThat(devModeClient.telemetry.packetsSent.get(), equalTo(6)); - assertThat(devModeClient.telemetry.packetsDropped.get(), equalTo(7)); - assertThat(devModeClient.telemetry.packetsDroppedQueue.get(), equalTo(8)); - assertThat(devModeClient.telemetry.aggregatedContexts.get(), equalTo(9)); - assertThat(devModeClient.telemetry.aggregatedGaugeContexts.get(), equalTo(10)); - assertThat(devModeClient.telemetry.aggregatedCountContexts.get(), equalTo(11)); - assertThat(devModeClient.telemetry.aggregatedSetContexts.get(), equalTo(12)); - - devModeClient.telemetry.flush(); - - assertThat(devModeClient.telemetry.metricsSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.gaugeSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.countSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.setSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.histogramSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.distributionSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.eventsSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.serviceChecksSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.bytesSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.bytesDropped.get(), equalTo(0)); - assertThat(devModeClient.telemetry.packetsSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.packetsDropped.get(), equalTo(0)); - assertThat(devModeClient.telemetry.packetsDroppedQueue.get(), equalTo(0)); - assertThat(devModeClient.telemetry.aggregatedContexts.get(), equalTo(0)); - assertThat(devModeClient.telemetry.aggregatedGaugeContexts.get(), equalTo(0)); - assertThat(devModeClient.telemetry.aggregatedCountContexts.get(), equalTo(0)); - assertThat(devModeClient.telemetry.aggregatedSetContexts.get(), equalTo(0)); + telemetryClient.telemetry.incrMetricsSent(1); + telemetryClient.telemetry.incrGaugeSent(1); + telemetryClient.telemetry.incrCountSent(1); + telemetryClient.telemetry.incrSetSent(1); + telemetryClient.telemetry.incrHistogramSent(1); + telemetryClient.telemetry.incrDistributionSent(1); + telemetryClient.telemetry.incrMetricsSent(1, Message.Type.GAUGE); // adds to metricsSent + telemetryClient.telemetry.incrMetricsSent(1, Message.Type.COUNT); // adds to metricsSent + telemetryClient.telemetry.incrMetricsSent(1, Message.Type.SET); // adds to metricsSent + telemetryClient.telemetry.incrMetricsSent(1, Message.Type.HISTOGRAM); // adds to metricsSent + telemetryClient.telemetry.incrMetricsSent(1, Message.Type.DISTRIBUTION); // adds to metricsSent + telemetryClient.telemetry.incrEventsSent(2); + telemetryClient.telemetry.incrServiceChecksSent(3); + telemetryClient.telemetry.incrBytesSent(4); + telemetryClient.telemetry.incrBytesDropped(5); + telemetryClient.telemetry.incrPacketSent(6); + telemetryClient.telemetry.incrPacketDropped(7); + telemetryClient.telemetry.incrPacketDroppedQueue(8); + telemetryClient.telemetry.incrAggregatedContexts(9); + telemetryClient.telemetry.incrAggregatedGaugeContexts(10); + telemetryClient.telemetry.incrAggregatedCountContexts(11); + telemetryClient.telemetry.incrAggregatedSetContexts(12); + + assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(6)); + assertThat(telemetryClient.telemetry.gaugeSent.get(), equalTo(2)); + assertThat(telemetryClient.telemetry.countSent.get(), equalTo(2)); + assertThat(telemetryClient.telemetry.setSent.get(), equalTo(2)); + assertThat(telemetryClient.telemetry.histogramSent.get(), equalTo(2)); + assertThat(telemetryClient.telemetry.distributionSent.get(), equalTo(2)); + assertThat(telemetryClient.telemetry.eventsSent.get(), equalTo(2)); + assertThat(telemetryClient.telemetry.serviceChecksSent.get(), equalTo(3)); + assertThat(telemetryClient.telemetry.bytesSent.get(), equalTo(4)); + assertThat(telemetryClient.telemetry.bytesDropped.get(), equalTo(5)); + assertThat(telemetryClient.telemetry.packetsSent.get(), equalTo(6)); + assertThat(telemetryClient.telemetry.packetsDropped.get(), equalTo(7)); + assertThat(telemetryClient.telemetry.packetsDroppedQueue.get(), equalTo(8)); + assertThat(telemetryClient.telemetry.aggregatedContexts.get(), equalTo(9)); + assertThat(telemetryClient.telemetry.aggregatedGaugeContexts.get(), equalTo(10)); + assertThat(telemetryClient.telemetry.aggregatedCountContexts.get(), equalTo(11)); + assertThat(telemetryClient.telemetry.aggregatedSetContexts.get(), equalTo(12)); + + telemetryClient.telemetry.flush(); + + assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.gaugeSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.countSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.setSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.histogramSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.distributionSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.eventsSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.serviceChecksSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.bytesSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.bytesDropped.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.packetsSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.packetsDropped.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.packetsDroppedQueue.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.aggregatedContexts.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.aggregatedGaugeContexts.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.aggregatedCountContexts.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.aggregatedSetContexts.get(), equalTo(0)); List statsdMessages = fakeProcessor.getMessagesAsStrings() ; @@ -255,23 +253,23 @@ public void telemetry_incrManuallyIncrData() throws Exception { assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.HISTOGRAM) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.HISTOGRAM) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.DISTRIBUTION) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.DISTRIBUTION) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:2|c|#test," + telemetryTags + "\n")); @@ -299,15 +297,15 @@ public void telemetry_incrManuallyIncrData() throws Exception { assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:10|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:11|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:12|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); } @Test(timeout = 5000L) @@ -468,54 +466,54 @@ public void telemetry_SentData() throws Exception { public void telemetry_DevModeData() throws Exception { - devModeClient.gauge("gauge", 24); - devModeClient.count("count", 1); - devModeClient.histogram("histo", 1); - devModeClient.distribution("distro", 1); + telemetryClient.gauge("gauge", 24); + telemetryClient.count("count", 1); + telemetryClient.histogram("histo", 1); + telemetryClient.distribution("distro", 1); // leaving time to the server to flush metrics (equivalent to waitForMessage) - while (devModeClient.telemetry.metricsSent.get() == 0 - || devModeClient.telemetry.packetsSent.get() == 0 - || devModeClient.telemetry.bytesSent.get() == 0) { + while (telemetryClient.telemetry.metricsSent.get() == 0 + || telemetryClient.telemetry.packetsSent.get() == 0 + || telemetryClient.telemetry.bytesSent.get() == 0) { try { Thread.sleep(50L); } catch (InterruptedException e) {} } - assertThat(devModeClient.telemetry.metricsSent.get(), equalTo(4)); - assertThat(devModeClient.telemetry.gaugeSent.get(), equalTo(1)); - assertThat(devModeClient.telemetry.countSent.get(), equalTo(1)); - assertThat(devModeClient.telemetry.setSent.get(), equalTo(0)); - assertThat(devModeClient.telemetry.histogramSent.get(), equalTo(1)); - assertThat(devModeClient.telemetry.distributionSent.get(), equalTo(1)); - assertThat(devModeClient.telemetry.packetsSent.get(), equalTo(1)); - assertThat(devModeClient.telemetry.bytesSent.get(), equalTo(106)); + assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(4)); + assertThat(telemetryClient.telemetry.gaugeSent.get(), equalTo(1)); + assertThat(telemetryClient.telemetry.countSent.get(), equalTo(1)); + assertThat(telemetryClient.telemetry.setSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.histogramSent.get(), equalTo(1)); + assertThat(telemetryClient.telemetry.distributionSent.get(), equalTo(1)); + assertThat(telemetryClient.telemetry.packetsSent.get(), equalTo(1)); + assertThat(telemetryClient.telemetry.bytesSent.get(), equalTo(106)); // Start flush timer with a 50ms interval - devModeClient.telemetry.start(50L); + telemetryClient.telemetry.start(50L); // Wait for the flush to happen - while (devModeClient.telemetry.metricsSent.get() != 0) { + while (telemetryClient.telemetry.metricsSent.get() != 0) { try { Thread.sleep(30L); } catch (InterruptedException e) {} } - devModeClient.telemetry.stop(); + telemetryClient.telemetry.stop(); - assertThat(devModeClient.telemetry.metricsSent.get(), equalTo(0)); + assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(0)); List statsdMessages = fakeProcessor.getMessagesAsStrings(); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics:4|c|#test," + telemetryTags + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:0|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.HISTOGRAM) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.HISTOGRAM) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.DISTRIBUTION) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.DISTRIBUTION) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_sent:106|c|#test," + telemetryTags + "\n")); @@ -526,10 +524,10 @@ public void telemetry_DevModeData() throws Exception { // aggregation is disabled assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context:0|c|#test," + telemetryTags + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:0|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:0|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:0|c|#test," + - devModeClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); } } From d04997dbb7cf00836b7a4efdb1bb4a6f612f9238 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Wed, 15 Sep 2021 13:49:20 -0500 Subject: [PATCH 2/2] [telemetry] test: fix tests broken after dev mode removal. --- .../com/timgroup/statsd/NonBlockingStatsDClientTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index ec972751..9c6f690f 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -1053,7 +1053,7 @@ public void sends_telemetry_elsewhere() throws Exception { // 8 messages in telemetry batch final List messages = telemetryServer.messagesReceived(); - assertEquals(9, messages.size()); + assertEquals(17, messages.size()); assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.metrics:1|c"))); assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.events:0|c"))); assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.service_checks:0|c"))); @@ -1225,7 +1225,7 @@ public void testAggregationTelemetry() throws Exception { List messages = server.messagesReceived(); - assertThat(messages.size(), comparesEqualTo(3+9)); + assertThat(messages.size(), comparesEqualTo(3+17)); assertThat(messages, hasItem(startsWith("datadog.dogstatsd.client.aggregated_context:27|c"))); } finally { @@ -1300,7 +1300,7 @@ private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsD final int senderWorkers, boolean blocking) throws StatsDClientException { super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout,bufferSize, - maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking, false, 0, false, 0, 0); + maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking, false, 0, 0, 0); lock = new CountDownLatch(1); }