diff --git a/src/main/java/com/timgroup/statsd/Message.java b/src/main/java/com/timgroup/statsd/Message.java new file mode 100644 index 00000000..33a87667 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/Message.java @@ -0,0 +1,13 @@ +package com.timgroup.statsd; + +interface Message { + /** + * Write this message to the provided {@link StringBuilder}. Will + * be called from the sender threads. + * + * @param builder + * StringBuilder the text representation will be written to. + */ + void writeTo(StringBuilder builder); +} + diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 10a8825f..174bb814 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -1,5 +1,7 @@ package com.timgroup.statsd; +import com.timgroup.statsd.Message; + import jnr.unixsocket.UnixDatagramChannel; import jnr.unixsocket.UnixSocketAddress; import jnr.unixsocket.UnixSocketOptions; @@ -98,50 +100,52 @@ String tag() { }; /** - * Because NumberFormat is not thread-safe we cannot share instances across threads. Use a ThreadLocal to - * create one pre thread as this seems to offer a significant performance improvement over creating one per-thread: - * http://stackoverflow.com/a/1285297/2648 - * https://github.com/indeedeng/java-dogstatsd-client/issues/4 + * The NumberFormat instances are not threadsafe and thus defined as ThreadLocal + * for safety. */ - private static final ThreadLocal NUMBER_FORMATTERS = new ThreadLocal() { + private static final ThreadLocal NUMBER_FORMATTER = new ThreadLocal() { @Override protected NumberFormat initialValue() { - - // Always create the formatter for the US locale in order to avoid this bug: - // https://github.com/indeedeng/java-dogstatsd-client/issues/3 - final NumberFormat numberFormatter = NumberFormat.getInstance(Locale.US); - numberFormatter.setGroupingUsed(false); - numberFormatter.setMaximumFractionDigits(6); - - // we need to specify a value for Double.NaN that is recognized by dogStatsD - if (numberFormatter instanceof DecimalFormat) { // better safe than a runtime error - final DecimalFormat decimalFormat = (DecimalFormat) numberFormatter; - final DecimalFormatSymbols symbols = decimalFormat.getDecimalFormatSymbols(); - symbols.setNaN("NaN"); - decimalFormat.setDecimalFormatSymbols(symbols); - } - - return numberFormatter; + return newFormatter(false); } }; - - private static final ThreadLocal SAMPLE_RATE_FORMATTERS = new ThreadLocal() { + private static final ThreadLocal SAMPLE_RATE_FORMATTER = new ThreadLocal() { @Override protected NumberFormat initialValue() { - final NumberFormat numberFormatter = NumberFormat.getInstance(Locale.US); - numberFormatter.setGroupingUsed(false); - numberFormatter.setMinimumFractionDigits(6); - - if (numberFormatter instanceof DecimalFormat) { - final DecimalFormat decimalFormat = (DecimalFormat) numberFormatter; - final DecimalFormatSymbols symbols = decimalFormat.getDecimalFormatSymbols(); - symbols.setNaN("NaN"); - decimalFormat.setDecimalFormatSymbols(symbols); - } - return numberFormatter; + return newFormatter(true); } }; + static { + } + + private static NumberFormat newFormatter(boolean sampler) { + // Always create the formatter for the US locale in order to avoid this bug: + // https://github.com/indeedeng/java-dogstatsd-client/issues/3 + NumberFormat numberFormatter = NumberFormat.getInstance(Locale.US); + numberFormatter.setGroupingUsed(false); + + // we need to specify a value for Double.NaN that is recognized by dogStatsD + if (numberFormatter instanceof DecimalFormat) { // better safe than a runtime error + final DecimalFormat decimalFormat = (DecimalFormat) numberFormatter; + final DecimalFormatSymbols symbols = decimalFormat.getDecimalFormatSymbols(); + symbols.setNaN("NaN"); + decimalFormat.setDecimalFormatSymbols(symbols); + } + + if (sampler) { + numberFormatter.setMinimumFractionDigits(6); + } else { + numberFormatter.setMaximumFractionDigits(6); + } + + return numberFormatter; + } + + private static String format(ThreadLocal formatter, double value) { + return formatter.get().format(value); + } + private final String prefix; private final DatagramChannel clientChannel; private final StatsDClientErrorHandler handler; @@ -200,6 +204,10 @@ protected NumberFormat initialValue() { * The number of sender worker threads submitting buffers to the socket. * @param blocking * Blocking or non-blocking implementation for statsd message queue. + * @param enableTelemetry + * Should telemetry be enabled for the client. + * @param telemetryFlushInterval + * Telemetry flush interval in seconds when the feature is enabled. * @throws StatsDClientException * if the client could not be started */ @@ -211,7 +219,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ final int telemetryFlushInterval) throws StatsDClientException { if ((prefix != null) && (!prefix.isEmpty())) { - this.prefix = new StringBuilder(prefix).append(".").toString(); + this.prefix = prefix + "."; } else { this.prefix = ""; } @@ -239,7 +247,8 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ if (costantPreTags.isEmpty()) { constantTagsRendered = null; } else { - constantTagsRendered = tagString(costantPreTags.toArray(new String[costantPreTags.size()]), null); + constantTagsRendered = tagString( + costantPreTags.toArray(new String[costantPreTags.size()]), null, new StringBuilder()).toString(); } costantPreTags = null; } @@ -270,7 +279,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ String telemetrytags = tagString(new String[]{CLIENT_TRANSPORT_TAG + transportType, CLIENT_VERSION_TAG + properties.getProperty("dogstatsd_client_version"), - CLIENT_TAG}); + CLIENT_TAG}, new StringBuilder()).toString(); this.telemetry = new Telemetry(telemetrytags, statsDProcessor); @@ -348,35 +357,106 @@ public void close() { * Return tag list as a tag string. * Generate a suffix conveying the given tag list to the client */ - static String tagString(final String[] tags, final String tagPrefix) { - final StringBuilder sb; + static StringBuilder tagString(final String[] tags, final String tagPrefix, final StringBuilder sb) { if (tagPrefix != null) { + sb.append(tagPrefix); if ((tags == null) || (tags.length == 0)) { - return tagPrefix; + return sb; } - sb = new StringBuilder(tagPrefix); - sb.append(","); + sb.append(','); } else { if ((tags == null) || (tags.length == 0)) { - return ""; + return sb; } - sb = new StringBuilder("|#"); + sb.append("|#"); } for (int n = tags.length - 1; n >= 0; n--) { sb.append(tags[n]); if (n > 0) { - sb.append(","); + sb.append(','); } } - return sb.toString(); + return sb; } /** * Generate a suffix conveying the given tag list to the client. */ - String tagString(final String[] tags) { - return tagString(tags, constantTagsRendered); + StringBuilder tagString(final String[] tags, StringBuilder builder) { + return tagString(tags, constantTagsRendered, builder); + } + + abstract class StatsDMessage implements Message { + final String aspect; + final String type; + final double sampleRate; // NaN for none + final String[] tags; + + protected StatsDMessage(String aspect, String type, double sampleRate, String[] tags) { + this.aspect = aspect; + this.type = type; + this.sampleRate = sampleRate; + this.tags = tags; + } + + @Override + public final void writeTo(StringBuilder builder) { + builder.append(prefix).append(aspect).append(':'); + writeValue(builder); + builder.append('|').append(type); + if (!Double.isNaN(sampleRate)) { + builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate)); + } + tagString(tags, builder); + } + + protected abstract void writeValue(StringBuilder builder); + } + + + private void sendMetric(final Message message) { + send(message); + this.telemetry.incrMetricsSent(1); + } + + private void send(final Message message) { + if (!statsDProcessor.send(message)) { + this.telemetry.incrPacketDroppedQueue(1); + } + } + + // send double with sample rate + private void send(String aspect, final double value, String type, double sampleRate, String[] tags) { + if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) { + + sendMetric(new StatsDMessage(aspect, type, sampleRate, tags) { + @Override protected void writeValue(StringBuilder builder) { + builder.append(format(NUMBER_FORMATTER, value)); + } + }); + } + } + + // send double without sample rate + private void send(String aspect, final double value, String type, String[] tags) { + send(aspect, value, type, Double.NaN, tags); + } + + // send long with sample rate + private void send(String aspect, final long value, String type, double sampleRate, String[] tags) { + if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) { + sendMetric(new StatsDMessage(aspect, type, sampleRate, tags) { + @Override protected void writeValue(StringBuilder builder) { + builder.append(value); + } + }); + } + } + + // send long without sample rate + private void send(String aspect, final long value, String type, String[] tags) { + send(aspect, value, type, Double.NaN, tags); } /** @@ -393,13 +473,7 @@ String tagString(final String[] tags) { */ @Override public void count(final String aspect, final long delta, final String... tags) { - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(delta) - .append("|c") - .append(tagString(tags)) - .toString()); + send(aspect, delta, "c", tags); } /** @@ -407,17 +481,7 @@ public void count(final String aspect, final long delta, final String... tags) { */ @Override public void count(final String aspect, final long delta, final double sampleRate, final String...tags) { - if (isInvalidSample(sampleRate)) { - return; - } - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(delta) - .append("|c|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, delta, "c", sampleRate, tags); } /** @@ -434,13 +498,7 @@ public void count(final String aspect, final long delta, final double sampleRate */ @Override public void count(final String aspect, final double delta, final String... tags) { - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(delta)) - .append("|c") - .append(tagString(tags)) - .toString()); + send(aspect, delta, "c", tags); } /** @@ -448,17 +506,7 @@ public void count(final String aspect, final double delta, final String... tags) */ @Override public void count(final String aspect, final double delta, final double sampleRate, final String...tags) { - if (isInvalidSample(sampleRate)) { - return; - } - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(delta)) - .append("|c|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, delta, "c", sampleRate, tags); } /** @@ -553,15 +601,7 @@ public void decrement(final String aspect, final double sampleRate, final String */ @Override public void recordGaugeValue(final String aspect, final double value, final String... tags) { - /* Intentionally using %s rather than %f here to avoid - * padding with extra 0s to represent precision */ - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(value)) - .append("|g") - .append(tagString(tags)) - .toString()); + send(aspect, value, "g", tags); } /** @@ -569,17 +609,7 @@ public void recordGaugeValue(final String aspect, final double value, final Stri */ @Override public void recordGaugeValue(final String aspect, final double value, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(value)) - .append("|g|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, value, "g", sampleRate, tags); } /** @@ -596,13 +626,7 @@ public void recordGaugeValue(final String aspect, final double value, final doub */ @Override public void recordGaugeValue(final String aspect, final long value, final String... tags) { - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(value) - .append("|g") - .append(tagString(tags)) - .toString()); + send(aspect, value, "g", tags); } /** @@ -610,17 +634,7 @@ public void recordGaugeValue(final String aspect, final long value, final String */ @Override public void recordGaugeValue(final String aspect, final long value, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(value) - .append("|g|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, value, "g", sampleRate, tags); } /** @@ -670,13 +684,7 @@ public void gauge(final String aspect, final long value, final double sampleRate */ @Override public void recordExecutionTime(final String aspect, final long timeInMs, final String... tags) { - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(timeInMs) - .append("|ms") - .append(tagString(tags)) - .toString()); + send(aspect, timeInMs, "ms", tags); } /** @@ -684,17 +692,7 @@ public void recordExecutionTime(final String aspect, final long timeInMs, final */ @Override public void recordExecutionTime(final String aspect, final long timeInMs, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(timeInMs) - .append("|ms|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, timeInMs, "ms", sampleRate, tags); } /** @@ -727,15 +725,7 @@ public void time(final String aspect, final long value, final double sampleRate, */ @Override public void recordHistogramValue(final String aspect, final double value, final String... tags) { - /* Intentionally using %s rather than %f here to avoid - * padding with extra 0s to represent precision */ - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(value)) - .append("|h") - .append(tagString(tags)) - .toString()); + send(aspect, value, "h", tags); } /** @@ -743,19 +733,7 @@ public void recordHistogramValue(final String aspect, final double value, final */ @Override public void recordHistogramValue(final String aspect, final double value, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - /* Intentionally using %s rather than %f here to avoid - * padding with extra 0s to represent precision */ - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(value)) - .append("|h|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, value, "h", sampleRate, tags); } /** @@ -772,13 +750,7 @@ public void recordHistogramValue(final String aspect, final double value, final */ @Override public void recordHistogramValue(final String aspect, final long value, final String... tags) { - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(value) - .append("|h") - .append(tagString(tags)) - .toString()); + send(aspect, value, "h", tags); } /** @@ -786,17 +758,7 @@ public void recordHistogramValue(final String aspect, final long value, final St */ @Override public void recordHistogramValue(final String aspect, final long value, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(value) - .append("|h|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, value, "h", sampleRate, tags); } /** @@ -847,15 +809,7 @@ public void histogram(final String aspect, final long value, final double sample */ @Override public void recordDistributionValue(final String aspect, final double value, final String... tags) { - /* Intentionally using %s rather than %f here to avoid - * padding with extra 0s to represent precision */ - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(value)) - .append("|d") - .append(tagString(tags)) - .toString()); + send(aspect, value, "d", tags); } /** @@ -863,19 +817,7 @@ public void recordDistributionValue(final String aspect, final double value, fin */ @Override public void recordDistributionValue(final String aspect, final double value, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - /* Intentionally using %s rather than %f here to avoid - * padding with extra 0s to represent precision */ - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(value)) - .append("|d|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, value, "d", sampleRate, tags); } /** @@ -894,13 +836,7 @@ public void recordDistributionValue(final String aspect, final double value, fin */ @Override public void recordDistributionValue(final String aspect, final long value, final String... tags) { - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(value) - .append("|d") - .append(tagString(tags)) - .toString()); + send(aspect, value, "d", tags); } /** @@ -908,20 +844,9 @@ public void recordDistributionValue(final String aspect, final long value, final */ @Override public void recordDistributionValue(final String aspect, final long value, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(value) - .append("|d|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); + send(aspect, value, "d", sampleRate, tags); } - /** * Convenience method equivalent to {@link #recordDistributionValue(String, double, String[])}. */ @@ -954,9 +879,7 @@ public void distribution(final String aspect, final long value, final double sam recordDistributionValue(aspect, value, sampleRate, tags); } - private String eventMap(final Event event) { - final StringBuilder res = new StringBuilder(""); - + private StringBuilder eventMap(final Event event, StringBuilder res) { final long millisSinceEpoch = event.getMillisSinceEpoch(); if (millisSinceEpoch != -1) { res.append("|d:").append(millisSinceEpoch / 1000); @@ -987,7 +910,7 @@ private String eventMap(final Event event) { res.append("|s:").append(sourceTypeName); } - return res.toString(); + return res; } /** @@ -1007,14 +930,20 @@ private String eventMap(final Event event) { */ @Override public void recordEvent(final Event event, final String... tags) { - final String title = escapeEventString(prefix + event.getTitle()); - final String text = escapeEventString(event.getText()); - send(new StringBuilder("_e{").append(title.length()).append(",").append(text.length()).append("}:").append(title) - .append("|").append(text).append(eventMap(event)).append(tagString(tags)).toString()); + statsDProcessor.send(new Message() { + @Override public void writeTo(StringBuilder builder) { + final String title = escapeEventString(prefix + event.getTitle()); + final String text = escapeEventString(event.getText()); + builder.append("_e{").append(title.length()).append(",").append(text.length()).append("}:").append(title) + .append("|").append(text); + eventMap(event, builder); + tagString(tags, builder); + } + }); this.telemetry.incrEventsSent(1); } - private String escapeEventString(final String title) { + private static String escapeEventString(final String title) { return title.replace("\n", "\\n"); } @@ -1030,7 +959,22 @@ private String escapeEventString(final String title) { */ @Override public void recordServiceCheckRun(final ServiceCheck sc) { - send(toStatsDString(sc)); + statsDProcessor.send(new Message() { + @Override public void writeTo(StringBuilder sb) { + // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks + sb.append("_sc|").append(sc.getName()).append("|").append(sc.getStatus()); + if (sc.getTimestamp() > 0) { + sb.append("|d:").append(sc.getTimestamp()); + } + if (sc.getHostname() != null) { + sb.append("|h:").append(sc.getHostname()); + } + tagString(sc.getTags(), sb); + if (sc.getMessage() != null) { + sb.append("|m:").append(sc.getEscapedMessage()); + } + } + }); this.telemetry.incrServiceChecksSent(1); } @@ -1056,23 +1000,6 @@ private static boolean updateTagsWithEntityID(final List tags, String en return false; } - private String toStatsDString(final ServiceCheck sc) { - // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks - final StringBuilder sb = new StringBuilder(); - sb.append("_sc|").append(sc.getName()).append("|").append(sc.getStatus()); - if (sc.getTimestamp() > 0) { - sb.append("|d:").append(sc.getTimestamp()); - } - if (sc.getHostname() != null) { - sb.append("|h:").append(sc.getHostname()); - } - sb.append(tagString(sc.getTags())); - if (sc.getMessage() != null) { - sb.append("|m:").append(sc.getEscapedMessage()); - } - return sb.toString(); - } - /** * Convenience method equivalent to {@link #recordServiceCheckRun(ServiceCheck sc)}. */ @@ -1105,24 +1032,11 @@ public void serviceCheck(final ServiceCheck sc) { public void recordSetValue(final String aspect, final String value, final String... tags) { // documentation is light, but looking at dogstatsd source, we can send string values // here instead of numbers - sendMetric(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(value) - .append("|s") - .append(tagString(tags)) - .toString()); - } - - private void sendMetric(final String message) { - send(message); - this.telemetry.incrMetricsSent(1); - } - - private void send(final String message) { - if (!statsDProcessor.send(message)) { - this.telemetry.incrPacketDroppedQueue(1); - } + statsDProcessor.send(new StatsDMessage(aspect, "s", Double.NaN, tags) { + @Override protected void writeValue(StringBuilder builder) { + builder.append(value); + } + }); } private boolean isInvalidSample(double sampleRate) { diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 147486f4..6687a0ed 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -1,25 +1,103 @@ package com.timgroup.statsd; -import java.nio.ByteBuffer; +import com.timgroup.statsd.Message; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; + public class StatsDBlockingProcessor extends StatsDProcessor { - private final BlockingQueue messages; + private final BlockingQueue messages; + + private class ProcessingTask extends StatsDProcessor.ProcessingTask { + + @Override + public void run() { + boolean empty; + ByteBuffer sendBuffer; + + try { + sendBuffer = bufferPool.borrow(); + } catch (final InterruptedException e) { + handler.handle(e); + return; + } + + while (!(shutdown && messages.isEmpty())) { + + try { + + final Message message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); + if (message != null) { + + builder.setLength(0); + + message.writeTo(builder); + int lowerBoundSize = builder.length(); + + if (sendBuffer.capacity() < lowerBoundSize) { + throw new InvalidMessageException(MESSAGE_TOO_LONG, builder.toString()); + } + + if (sendBuffer.remaining() < (lowerBoundSize + 1)) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + + sendBuffer.mark(); + if (sendBuffer.position() > 0) { + sendBuffer.put((byte) '\n'); + } + + try { + writeBuilderToSendBuffer(sendBuffer); + } catch (BufferOverflowException boe) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + writeBuilderToSendBuffer(sendBuffer); + } + + if (null == messages.peek()) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + } + } catch (final InterruptedException e) { + if (shutdown) { + endSignal.countDown(); + return; + } + } catch (final Exception e) { + handler.handle(e); + } + } + + builder.setLength(0); + builder.trimToSize(); + endSignal.countDown(); + } + + } StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers); - this.messages = new ArrayBlockingQueue(queueSize); + this.messages = new ArrayBlockingQueue<>(queueSize); + } + + @Override + protected ProcessingTask createProcessingTask() { + return new ProcessingTask(); } @Override - boolean send(final String message) { + protected boolean send(final Message message) { try { if (!shutdown) { messages.put(message); @@ -31,72 +109,4 @@ boolean send(final String message) { return false; } - - @Override - public void run() { - - for (int i = 0 ; i < workers ; i++) { - executor.submit(new Runnable() { - public void run() { - boolean empty; - ByteBuffer sendBuffer; - - try { - sendBuffer = bufferPool.borrow(); - } catch (final InterruptedException e) { - handler.handle(e); - return; - } - - while (!(messages.isEmpty() && shutdown)) { - - try { - - if (Thread.interrupted()) { - return; - } - - final String message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); - if (message != null) { - final byte[] data = message.getBytes(MESSAGE_CHARSET); - if (sendBuffer.capacity() < data.length) { - throw new InvalidMessageException(MESSAGE_TOO_LONG, message); - } - if (sendBuffer.remaining() < (data.length + 1)) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - if (sendBuffer.position() > 0) { - sendBuffer.put((byte) '\n'); - } - sendBuffer.put(data); - if (null == messages.peek()) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - } - } catch (final InterruptedException e) { - if (shutdown) { - endSignal.countDown(); - return; - } - } catch (final Exception e) { - handler.handle(e); - } - } - endSignal.countDown(); - } - }); - } - - boolean done = false; - while (!done) { - try { - endSignal.await(); - done = true; - } catch (final InterruptedException e) { - // NOTHING - } - } - } } diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 3ae8e651..4a42b4ed 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -1,17 +1,97 @@ package com.timgroup.statsd; -import java.nio.ByteBuffer; +import com.timgroup.statsd.Message; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; + public class StatsDNonBlockingProcessor extends StatsDProcessor { - private final Queue messages; + private final Queue messages; private final int qcapacity; private final AtomicInteger qsize; // qSize will not reflect actual size, but a close estimate. + private class ProcessingTask extends StatsDProcessor.ProcessingTask { + + @Override + public void run() { + boolean empty; + ByteBuffer sendBuffer; + + try { + sendBuffer = bufferPool.borrow(); + } catch (final InterruptedException e) { + handler.handle(e); + return; + } + + while (!((empty = messages.isEmpty()) && shutdown)) { + + try { + if (empty) { + Thread.sleep(WAIT_SLEEP_MS); + continue; + } + + if (Thread.interrupted()) { + return; + } + final Message message = messages.poll(); + if (message != null) { + + qsize.decrementAndGet(); + builder.setLength(0); + + message.writeTo(builder); + int lowerBoundSize = builder.length(); + + if (sendBuffer.capacity() < lowerBoundSize) { + throw new InvalidMessageException(MESSAGE_TOO_LONG, builder.toString()); + } + + if (sendBuffer.remaining() < (lowerBoundSize + 1)) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + + sendBuffer.mark(); + if (sendBuffer.position() > 0) { + sendBuffer.put((byte) '\n'); + } + + try { + writeBuilderToSendBuffer(sendBuffer); + } catch (BufferOverflowException boe) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + writeBuilderToSendBuffer(sendBuffer); + } + + if (null == messages.peek()) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + } + } catch (final InterruptedException e) { + if (shutdown) { + endSignal.countDown(); + return; + } + } catch (final Exception e) { + handler.handle(e); + } + } + + builder.setLength(0); + builder.trimToSize(); + endSignal.countDown(); + } + } + StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers) throws Exception { @@ -19,11 +99,16 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers); this.qsize = new AtomicInteger(0); this.qcapacity = queueSize; - this.messages = new ConcurrentLinkedQueue(); + this.messages = new ConcurrentLinkedQueue<>(); + } + + @Override + protected ProcessingTask createProcessingTask() { + return new ProcessingTask(); } @Override - boolean send(final String message) { + protected boolean send(final Message message) { if (!shutdown) { if (qsize.get() < qcapacity) { messages.offer(message); @@ -35,78 +120,6 @@ boolean send(final String message) { return false; } - @Override - public void run() { - - for (int i = 0 ; i < workers ; i++) { - executor.submit(new Runnable() { - public void run() { - boolean empty; - ByteBuffer sendBuffer; - - try { - sendBuffer = bufferPool.borrow(); - } catch (final InterruptedException e) { - handler.handle(e); - return; - } - - while (!((empty = messages.isEmpty()) && shutdown)) { - - try { - if (empty) { - Thread.sleep(WAIT_SLEEP_MS); - continue; - } - - if (Thread.interrupted()) { - return; - } - final String message = messages.poll(); - if (message != null) { - qsize.decrementAndGet(); - final byte[] data = message.getBytes(MESSAGE_CHARSET); - if (sendBuffer.capacity() < data.length) { - throw new InvalidMessageException(MESSAGE_TOO_LONG, message); - } - if (sendBuffer.remaining() < (data.length + 1)) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - if (sendBuffer.position() > 0) { - sendBuffer.put((byte) '\n'); - } - sendBuffer.put(data); - if (null == messages.peek()) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - } - } catch (final InterruptedException e) { - if (shutdown) { - endSignal.countDown(); - return; - } - } catch (final Exception e) { - handler.handle(e); - } - } - endSignal.countDown(); - } - }); - } - - boolean done = false; - while (!done) { - try { - endSignal.await(); - done = true; - } catch (final InterruptedException e) { - // NOTHING - } - } - } - boolean isShutdown() { return shutdown; } diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 838b9957..b7767ca3 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -1,8 +1,17 @@ package com.timgroup.statsd; +import com.timgroup.statsd.Message; + +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -14,6 +23,7 @@ public abstract class StatsDProcessor implements Runnable { protected static final Charset MESSAGE_CHARSET = Charset.forName("UTF-8"); + protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer"; protected static final int WAIT_SLEEP_MS = 10; // 10 ms would be a 100HZ slice @@ -28,6 +38,32 @@ public abstract class StatsDProcessor implements Runnable { protected volatile boolean shutdown; + protected abstract class ProcessingTask implements Runnable { + protected StringBuilder builder = new StringBuilder(); + protected CharBuffer buffer = CharBuffer.wrap(builder); + protected final CharsetEncoder utf8Encoder = MESSAGE_CHARSET.newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + + public abstract void run(); + + protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { + + int length = builder.length(); + // use existing charbuffer if possible, otherwise re-wrap + if (length <= buffer.capacity()) { + buffer.limit(length).position(0); + } else { + buffer = CharBuffer.wrap(builder); + } + + if (utf8Encoder.encode(buffer, sendBuffer, true) == CoderResult.OVERFLOW) { + throw new BufferOverflowException(); + } + } + } + + StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers) throws Exception { @@ -41,7 +77,9 @@ public abstract class StatsDProcessor implements Runnable { this.endSignal = new CountDownLatch(workers); } - abstract boolean send(final String message); + protected abstract ProcessingTask createProcessingTask(); + + protected abstract boolean send(final Message message); public BufferPool getBufferPool() { return this.bufferPool; @@ -52,7 +90,22 @@ public BlockingQueue getOutboundQueue() { } @Override - public abstract void run(); + public void run() { + + for (int i = 0 ; i < workers ; i++) { + executor.submit(createProcessingTask()); + } + + boolean done = false; + while (!done) { + try { + endSignal.await(); + done = true; + } catch (final InterruptedException e) { + // NOTHING + } + } + } boolean isShutdown() { return shutdown; diff --git a/src/main/java/com/timgroup/statsd/StatsDSender.java b/src/main/java/com/timgroup/statsd/StatsDSender.java index caa691d6..44d9500b 100644 --- a/src/main/java/com/timgroup/statsd/StatsDSender.java +++ b/src/main/java/com/timgroup/statsd/StatsDSender.java @@ -4,12 +4,8 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -import java.nio.charset.Charset; - -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; diff --git a/src/main/java/com/timgroup/statsd/Telemetry.java b/src/main/java/com/timgroup/statsd/Telemetry.java index 5437bb53..4cd1d95a 100644 --- a/src/main/java/com/timgroup/statsd/Telemetry.java +++ b/src/main/java/com/timgroup/statsd/Telemetry.java @@ -1,5 +1,7 @@ package com.timgroup.statsd; +import com.timgroup.statsd.Message; + import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; @@ -8,23 +10,26 @@ public class Telemetry { public static int DEFAULT_FLUSH_INTERVAL = 10000; // 10s - protected AtomicInteger metricsSent; - protected AtomicInteger eventsSent; - protected AtomicInteger serviceChecksSent; - protected AtomicInteger bytesSent; - protected AtomicInteger bytesDropped; - protected AtomicInteger packetsSent; - protected AtomicInteger packetsDropped; - protected AtomicInteger packetsDroppedQueue; - - protected String metricsSentMetric; - protected String eventsSentMetric; - protected String serviceChecksSentMetric; - protected String bytesSentMetric; - protected String bytesDroppedMetric; - protected String packetsSentMetric; - protected String packetsDroppedMetric; - protected String packetsDroppedQueueMetric; + protected final AtomicInteger metricsSent = new AtomicInteger(0); + + protected final AtomicInteger eventsSent = new AtomicInteger(0); + protected final AtomicInteger serviceChecksSent = new AtomicInteger(0); + protected final AtomicInteger bytesSent = new AtomicInteger(0); + protected final AtomicInteger bytesDropped = new AtomicInteger(0); + protected final AtomicInteger packetsSent = new AtomicInteger(0); + protected final AtomicInteger packetsDropped = new AtomicInteger(0); + protected final AtomicInteger packetsDroppedQueue = new AtomicInteger(0); + + protected final String metricsSentMetric = "datadog.dogstatsd.client.metrics"; + protected final String eventsSentMetric = "datadog.dogstatsd.client.events"; + protected final String serviceChecksSentMetric = "datadog.dogstatsd.client.service_checks"; + protected final String bytesSentMetric = "datadog.dogstatsd.client.bytes_sent"; + protected final String bytesDroppedMetric = "datadog.dogstatsd.client.bytes_dropped"; + protected final String packetsSentMetric = "datadog.dogstatsd.client.packets_sent"; + protected final String packetsDroppedMetric = "datadog.dogstatsd.client.packets_dropped"; + protected final String packetsDroppedQueueMetric = "datadog.dogstatsd.client.packets_dropped_queue"; + + protected String tags; public StatsDProcessor processor; protected Timer timer; @@ -42,32 +47,41 @@ public void run() { } } + class TelemetryMessage implements Message { + private final String aspect; + private final String type = "c"; // all counters + private final String tags; // pre-baked comma separeated tags string + private final int value; + + protected TelemetryMessage(String metric, int value, String tags) { + this.aspect = metric; + this.tags = tags; + this.value = value; + } + + @Override + public final void writeTo(StringBuilder builder) { + builder.append(aspect) + .append(':') + .append(this.value) + .append('|') + .append(type) + .append(tags); // already has the statsd separator baked-in + } + } + Telemetry(final String tags, final StatsDProcessor processor) { // precompute metrics lines with tags - this.metricsSentMetric = "datadog.dogstatsd.client.metrics:%d|c" + tags; - this.eventsSentMetric = "datadog.dogstatsd.client.events:%d|c" + tags; - this.serviceChecksSentMetric = "datadog.dogstatsd.client.service_checks:%d|c" + tags; - this.bytesSentMetric = "datadog.dogstatsd.client.bytes_sent:%d|c" + tags; - this.bytesDroppedMetric = "datadog.dogstatsd.client.bytes_dropped:%d|c" + tags; - this.packetsSentMetric = "datadog.dogstatsd.client.packets_sent:%d|c" + tags; - this.packetsDroppedMetric = "datadog.dogstatsd.client.packets_dropped:%d|c" + tags; - this.packetsDroppedQueueMetric = "datadog.dogstatsd.client.packets_dropped_queue:%d|c" + tags; - - this.metricsSent = new AtomicInteger(0); - this.eventsSent = new AtomicInteger(0); - this.serviceChecksSent = new AtomicInteger(0); - this.bytesSent = new AtomicInteger(0); - this.bytesDropped = new AtomicInteger(0); - this.packetsSent = new AtomicInteger(0); - this.packetsDropped = new AtomicInteger(0); - this.packetsDroppedQueue = new AtomicInteger(0); - + this.tags = tags; this.processor = processor; this.timer = null; } /** * Startsthe flush timer for the telemetry. + * + * @param flushInterval + * Telemetry flush interval in seconds. */ public void start(final long flushInterval) { // flush the telemetry at regualar interval @@ -92,14 +106,14 @@ public void flush() { // be spread out among processor worker and we flush every 5s by // default - this.processor.send(String.format(this.metricsSentMetric, this.metricsSent.getAndSet(0))); - this.processor.send(String.format(this.eventsSentMetric, this.eventsSent.getAndSet(0))); - this.processor.send(String.format(this.serviceChecksSentMetric, this.serviceChecksSent.getAndSet(0))); - this.processor.send(String.format(this.bytesSentMetric, this.bytesSent.getAndSet(0))); - this.processor.send(String.format(this.bytesDroppedMetric, this.bytesDropped.getAndSet(0))); - this.processor.send(String.format(this.packetsSentMetric, this.packetsSent.getAndSet(0))); - this.processor.send(String.format(this.packetsDroppedMetric, this.packetsDropped.getAndSet(0))); - this.processor.send(String.format(this.packetsDroppedQueueMetric, this.packetsDroppedQueue.getAndSet(0))); + processor.send(new TelemetryMessage(this.metricsSentMetric, this.metricsSent.getAndSet(0), tags)); + processor.send(new TelemetryMessage(this.eventsSentMetric, this.eventsSent.getAndSet(0), tags)); + processor.send(new TelemetryMessage(this.serviceChecksSentMetric, this.serviceChecksSent.getAndSet(0), tags)); + processor.send(new TelemetryMessage(this.bytesSentMetric, this.bytesSent.getAndSet(0), tags)); + processor.send(new TelemetryMessage(this.bytesDroppedMetric, this.bytesDropped.getAndSet(0), tags)); + processor.send(new TelemetryMessage(this.packetsSentMetric, this.packetsSent.getAndSet(0), tags)); + processor.send(new TelemetryMessage(this.packetsDroppedMetric, this.packetsDropped.getAndSet(0), tags)); + processor.send(new TelemetryMessage(this.packetsDroppedQueueMetric, this.packetsDroppedQueue.getAndSet(0), tags)); } public void incrMetricsSent(final int value) { diff --git a/src/test/java/com/timgroup/statsd/DummyStatsDServer.java b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java index 359bd944..4bfebc15 100644 --- a/src/test/java/com/timgroup/statsd/DummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java @@ -8,6 +8,10 @@ import java.nio.channels.DatagramChannel; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.LinkedBlockingQueue; + import jnr.unixsocket.UnixDatagramChannel; import jnr.unixsocket.UnixSocketAddress; import java.nio.charset.StandardCharsets; @@ -15,6 +19,7 @@ class DummyStatsDServer { private final List messagesReceived = new ArrayList(); + private AtomicInteger packetsReceived = new AtomicInteger(0); protected final DatagramChannel server; protected volatile Boolean freeze = false; @@ -48,6 +53,8 @@ public void run() { ((Buffer)packet).clear(); // Cast necessary to handle Java9 covariant return types // see: https://jira.mongodb.org/browse/JAVA-2559 for ref. server.receive(packet); + packetsReceived.addAndGet(1); + packet.flip(); for (String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) { messagesReceived.add(msg.trim()); @@ -75,6 +82,10 @@ public List messagesReceived() { return new ArrayList(messagesReceived); } + public int packetsReceived() { + return packetsReceived.get(); + } + public void freeze() { freeze = true; } @@ -92,6 +103,7 @@ public void close() throws IOException { } public void clear() { + packetsReceived.set(0); messagesReceived.clear(); } diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java index 5d4c050e..5a7359b9 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java @@ -3,11 +3,11 @@ import java.io.IOException; import java.net.SocketException; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +import java.util.Random; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -65,6 +65,9 @@ public void run() { } catch (InterruptedException ex) {} } + log.info("Messages at server: " + messages); + log.info("Packets at server: " + server.packetsReceived()); + assertEquals(testSize, server.messagesReceived().size()); } } diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index d252c961..cf71fa42 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -7,6 +7,8 @@ import org.junit.Rule; import org.junit.contrib.java.lang.system.EnvironmentVariables; +import com.timgroup.statsd.Message; + import java.io.IOException; import java.net.SocketAddress; import java.util.List; @@ -772,6 +774,7 @@ public void sends_too_large_message() throws Exception { final Exception exception = exceptions.get(0); assertEquals(InvalidMessageException.class, exception.getClass()); assertTrue(((InvalidMessageException)exception).getInvalidMessage().startsWith("_sc|toolong|")); + // assertEquals(BufferOverflowException.class, exception.getClass()); final List messages = server.messagesReceived(); assertEquals(1, messages.size()); diff --git a/src/test/java/com/timgroup/statsd/RecordingErrorHandler.java b/src/test/java/com/timgroup/statsd/RecordingErrorHandler.java index bf79979f..4ddc7635 100644 --- a/src/test/java/com/timgroup/statsd/RecordingErrorHandler.java +++ b/src/test/java/com/timgroup/statsd/RecordingErrorHandler.java @@ -1,13 +1,16 @@ package com.timgroup.statsd; import java.util.ArrayList; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.List; +import java.util.Queue; + /** * @author Taylor Schilling */ public class RecordingErrorHandler implements StatsDClientErrorHandler { - private final List exceptions = new ArrayList(); + private final Queue exceptions = new ConcurrentLinkedQueue<>(); @Override public void handle(final Exception exception) { @@ -15,6 +18,6 @@ public void handle(final Exception exception) { } public List getExceptions() { - return exceptions; + return new ArrayList(exceptions); } } diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index ed242a43..c451d508 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -12,6 +12,8 @@ import java.util.ArrayList; import java.util.Properties; +import com.timgroup.statsd.Message; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.equalTo; @@ -26,25 +28,47 @@ public class TelemetryTest { // fakeProcessor store messages from the telemetry only public static class FakeProcessor extends StatsDProcessor { - public final List messages = new ArrayList(); + public final List messages = new ArrayList<>(); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { super(0, handler, 0, 1, 1); } + + private class FakeProcessingTask extends StatsDProcessor.ProcessingTask { + @Override + public void run() {} + } + @Override - public boolean send(final String msg) { - messages.add(msg.trim()); + public boolean send(final Message msg) { + messages.add(msg); return true; } @Override public void run(){} - public List getMessages() { + @Override + protected ProcessingTask createProcessingTask() { + return new FakeProcessingTask(); + } + + public List getMessages() { return messages; } + protected List getMessagesAsStrings() { + StringBuilder sb = new StringBuilder(); + ArrayList stringMessages = new ArrayList<>(messages.size()); + for(Message m : messages) { + sb.setLength(0); + m.writeTo(sb); + stringMessages.add(sb.toString()); + } + return stringMessages; + } + public void clear() { try { messages.clear(); @@ -173,28 +197,30 @@ public void telemetry_incrManuallyIncrData() throws Exception { assertThat(client.telemetry.packetsDropped.get(), equalTo(0)); assertThat(client.telemetry.packetsDroppedQueue.get(), equalTo(0)); - assertThat(fakeProcessor.messages, + List statsdMessages = fakeProcessor.getMessagesAsStrings() ; + + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:2|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.service_checks:3|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_sent:4|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_dropped:5|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_sent:6|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped:7|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped_queue:8|c|#test," + telemetryTags)); } @@ -211,28 +237,30 @@ public void telemetry_incrMetricsSent() throws Exception { client.telemetry.flush(); assertThat(client.telemetry.metricsSent.get(), equalTo(0)); - assertThat(fakeProcessor.messages, + List statsdMessages = fakeProcessor.getMessagesAsStrings() ; + + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_sent:28|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_sent:1|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags)); } @@ -293,14 +321,16 @@ public void telemetry_flushInterval() throws Exception { assertThat(client.telemetry.metricsSent.get(), equalTo(0)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.bytes_sent:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.packets_sent:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags)); - assertThat(fakeProcessor.messages, hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags)); + List statsdMessages = fakeProcessor.getMessagesAsStrings() ; + + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_sent:0|c|#test," + telemetryTags)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_sent:0|c|#test," + telemetryTags)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags)); } @Test(timeout = 5000L)