diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 4e25edf2..4de18a99 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -520,24 +520,6 @@ public void recordGaugeValue(final String aspect, final double value, final doub send(aspect, value, "g", sampleRate, tags); } - /** - * {@inheritDoc} - */ - @Override - public void recordGaugeValue(final String aspect, final double value, final double sampleRate, final String... tags) { - if (isInvalidSample(sampleRate)) { - return; - } - send(new StringBuilder(prefix) - .append(aspect) - .append(":") - .append(NUMBER_FORMATTERS.get().format(value)) - .append("|g|@") - .append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate)) - .append(tagString(tags)) - .toString()); - } - /** * Records the latest fixed value for the specified named gauge. * @@ -703,6 +685,22 @@ public void histogram(final String aspect, final long value, final double sample recordHistogramValue(aspect, value, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void histogram(String aspect, double value, String... tags){ + recordHistogramValue(aspect, value, tags); + } + + /** + * {@inheritDoc} + */ + @Override + public void histogram(String aspect, double value, double sampleRate, String... tags){ + recordHistogramValue(aspect, value, sampleRate, tags); + } + /** * Records a value for the specified named distribution. * @@ -773,6 +771,22 @@ public void distribution(final String aspect, final long value, final double sam recordDistributionValue(aspect, value, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void distribution(String aspect, double value, String... tags){ + recordDistributionValue(aspect, value, tags); + } + + /** + * {@inheritDoc} + */ + @Override + public void distribution(String aspect, double value, double sampleRate, String... tags){ + recordDistributionValue(aspect, value, sampleRate, tags); + } + private StringBuilder eventMap(final Event event, StringBuilder res) { final long millisSinceEpoch = event.getMillisSinceEpoch(); if (millisSinceEpoch != -1) { diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index cf5c960f..485e83d7 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -2,6 +2,7 @@ import com.timgroup.statsd.Message; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.concurrent.ArrayBlockingQueue; @@ -12,16 +13,97 @@ public class StatsDBlockingProcessor extends StatsDProcessor { private final BlockingQueue messages; + private class ProcessingTask implements Runnable { + private final int id; + + ProcessingTask(int id) { + this.id = id; + } + + @Override + public void run() { + boolean empty; + ByteBuffer sendBuffer; + StringBuilder builder = builders.get(id); + + try { + sendBuffer = bufferPool.borrow(); + } catch (final InterruptedException e) { + handler.handle(e); + return; + } + + while (!(messages.isEmpty() && shutdown)) { + + try { + + if (Thread.interrupted()) { + return; + } + + final Message message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); + // TODO: revisit this logic - cleanup, remove duplicate code. + if (message != null) { + + builder.setLength(0); + + message.writeTo(builder); + int lowerBoundSize = builder.length(); + + // if (sendBuffer.capacity() < data.length) { + // throw new InvalidMessageException(MESSAGE_TOO_LONG, message); + // } + + if (sendBuffer.remaining() < (lowerBoundSize + 1)) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + + sendBuffer.mark(); + if (sendBuffer.position() > 0) { + sendBuffer.put((byte) '\n'); + } + + try { + writeBuilderToSendBuffer(id, builder, sendBuffer); + } catch (BufferOverflowException boe) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + writeBuilderToSendBuffer(id, builder, sendBuffer); + } + + // TODO: revisit this logic + if (null == messages.peek()) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + } + } catch (final InterruptedException e) { + if (shutdown) { + endSignal.countDown(); + return; + } + } catch (final Exception e) { + handler.handle(e); + } + } + + builder.setLength(0); + builder.trimToSize(); + endSignal.countDown(); + } + } + StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers); - this.messages = new ArrayBlockingQueue(queueSize); + this.messages = new ArrayBlockingQueue<>(queueSize); } @Override - boolean send(final Message message); + boolean send(final Message message){ try { if (!shutdown) { messages.put(message); @@ -38,80 +120,7 @@ public class StatsDBlockingProcessor extends StatsDProcessor { public void run() { for (int i = 0 ; i < workers ; i++) { - executor.submit(new Runnable() { - public void run() { - boolean empty; - ByteBuffer sendBuffer; - StringBuilder builder = new StringBuilder(); - CharBuffer charBuffer = CharBuffer.wrap(builder); - - try { - sendBuffer = bufferPool.borrow(); - } catch (final InterruptedException e) { - handler.handle(e); - return; - } - - while (!(messages.isEmpty() && shutdown)) { - - try { - - if (Thread.interrupted()) { - return; - } - - final Message message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); - // TODO: revisit this logic - cleanup, remove duplicate code. - if (message != null) { - - builder.setLength(0); - - message.writeTo(builder); - int lowerBoundSize = builder.length(); - - // if (sendBuffer.capacity() < data.length) { - // throw new InvalidMessageException(MESSAGE_TOO_LONG, message); - // } - - if (sendBuffer.remaining() < (lowerBoundSize + 1)) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - - sendBuffer.mark(); - if (sendBuffer.position() > 0) { - sendBuffer.put((byte) '\n'); - } - - try { - charBuffer = writeBuilderToSendBuffer(builder, charBuffer, sendBuffer); - } catch (BufferOverflowException boe) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - charBuffer = writeBuilderToSendBuffer(builder, charBuffer, sendBuffer); - } - - // TODO: revisit this logic - if (null == messages.peek()) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - } - } catch (final InterruptedException e) { - if (shutdown) { - endSignal.countDown(); - return; - } - } catch (final Exception e) { - handler.handle(e); - } - } - - builder.setLength(0); - builder.trimToSize(); - endSignal.countDown(); - } - }); + executor.submit(new ProcessingTask(i)); } boolean done = false; diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 5d02d40b..5acad07e 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -2,6 +2,7 @@ import com.timgroup.statsd.Message; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Queue; @@ -14,6 +15,91 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { private final int qcapacity; private final AtomicInteger qsize; // qSize will not reflect actual size, but a close estimate. + private class ProcessingTask implements Runnable { + private final int id; + + ProcessingTask(int id) { + this.id = id; + } + + @Override + public void run() { + boolean empty; + ByteBuffer sendBuffer; + StringBuilder builder = builders.get(id); + + try { + sendBuffer = bufferPool.borrow(); + } catch (final InterruptedException e) { + handler.handle(e); + return; + } + + while (!((empty = messages.isEmpty()) && shutdown)) { + + try { + if (empty) { + Thread.sleep(WAIT_SLEEP_MS); + continue; + } + + if (Thread.interrupted()) { + return; + } + final Message message = messages.poll(); + // TODO: revisit this logic - cleanup, remove duplicate code. + if (message != null) { + + qsize.decrementAndGet(); + builder.setLength(0); + + message.writeTo(builder); + int lowerBoundSize = builder.length(); + + // if (sendBuffer.capacity() < data.length) { + // throw new InvalidMessageException(MESSAGE_TOO_LONG, message); + // } + + if (sendBuffer.remaining() < (lowerBoundSize + 1)) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + + sendBuffer.mark(); + if (sendBuffer.position() > 0) { + sendBuffer.put((byte) '\n'); + } + + try { + writeBuilderToSendBuffer(id, builder, sendBuffer); + } catch (BufferOverflowException boe) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + writeBuilderToSendBuffer(id, builder, sendBuffer); + } + + // TODO: revisit this logic + if (null == messages.peek()) { + outboundQueue.put(sendBuffer); + sendBuffer = bufferPool.borrow(); + } + } + } catch (final InterruptedException e) { + if (shutdown) { + endSignal.countDown(); + return; + } + } catch (final Exception e) { + handler.handle(e); + } + } + + builder.setLength(0); + builder.trimToSize(); + endSignal.countDown(); + } + } + StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers) throws Exception { @@ -21,11 +107,11 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers); this.qsize = new AtomicInteger(0); this.qcapacity = queueSize; - this.messages = new ConcurrentLinkedQueue(); + this.messages = new ConcurrentLinkedQueue<>(); } @Override - boolean send(final Message message); + boolean send(final Message message){ if (!shutdown) { if (qsize.get() < qcapacity) { messages.offer(message); @@ -41,84 +127,7 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { public void run() { for (int i = 0 ; i < workers ; i++) { - executor.submit(new Runnable() { - public void run() { - boolean empty; - ByteBuffer sendBuffer; - StringBuilder builder = new StringBuilder(); - CharBuffer charBuffer = CharBuffer.wrap(builder); - - try { - sendBuffer = bufferPool.borrow(); - } catch (final InterruptedException e) { - handler.handle(e); - return; - } - - while (!((empty = messages.isEmpty()) && shutdown)) { - - try { - if (empty) { - Thread.sleep(WAIT_SLEEP_MS); - continue; - } - - if (Thread.interrupted()) { - return; - } - final String message = messages.poll(); - // TODO: revisit this logic - cleanup, remove duplicate code. - if (message != null) { - - qsize.decrementAndGet(); - builder.setLength(0); - - message.writeTo(builder); - int lowerBoundSize = builder.length(); - - // if (sendBuffer.capacity() < data.length) { - // throw new InvalidMessageException(MESSAGE_TOO_LONG, message); - // } - - if (sendBuffer.remaining() < (lowerBoundSize + 1)) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - - sendBuffer.mark(); - if (sendBuffer.position() > 0) { - sendBuffer.put((byte) '\n'); - } - - try { - charBuffer = writeBuilderToSendBuffer(builder, charBuffer, sendBuffer); - } catch (BufferOverflowException boe) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - charBuffer = writeBuilderToSendBuffer(builder, charBuffer, sendBuffer); - } - - // TODO: revisit this logic - if (null == messages.peek()) { - outboundQueue.put(sendBuffer); - sendBuffer = bufferPool.borrow(); - } - } - } catch (final InterruptedException e) { - if (shutdown) { - endSignal.countDown(); - return; - } - } catch (final Exception e) { - handler.handle(e); - } - } - - builder.setLength(0); - builder.trimToSize(); - endSignal.countDown(); - } - }); + executor.submit(new ProcessingTask(i)); } boolean done = false; diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 8c688b34..7c647eb7 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -2,12 +2,16 @@ import com.timgroup.statsd.Message; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -29,6 +33,8 @@ public abstract class StatsDProcessor implements Runnable { protected final StatsDClientErrorHandler handler; protected final BufferPool bufferPool; + protected final List builders; // StringBuilders for processing, 1 per worker + protected final List charBuffers; // CharBuffers for processing, 1 per worker protected final BlockingQueue outboundQueue; // FIFO queue with outbound buffers protected final ExecutorService executor; protected final CountDownLatch endSignal; @@ -48,6 +54,15 @@ public abstract class StatsDProcessor implements Runnable { this.bufferPool = new BufferPool(poolSize, maxPacketSizeBytes, true); this.outboundQueue = new ArrayBlockingQueue(poolSize); this.endSignal = new CountDownLatch(workers); + + this.builders = new ArrayList<>(workers); + this.charBuffers = new ArrayList<>(workers); + for(int i=0; i getOutboundQueue() { @Override public abstract void run(); - private CharBuffer writeBuilderToSendBuffer(StringBuilder builder, CharBuffer charBuffer, ByteBuffer sendBuffer) { + protected void writeBuilderToSendBuffer(int workerId, StringBuilder builder, ByteBuffer sendBuffer) { + CharBuffer buffer = charBuffers.get(workerId); + int length = builder.length(); // use existing charbuffer if possible, otherwise re-wrap if (length <= buffer.capacity()) { - charBuffer.limit(length).position(0); + buffer.limit(length).position(0); } else { - charBuffer = buffer.wrap(builder); + buffer = CharBuffer.wrap(builder); + charBuffers.set(workerId, buffer); } - if (utf8Encoder.encode(charBuffer, sendBuffer, true) == CoderResult.OVERFLOW) { - // FIXME: if we throw an exception here we won't return the charbuffer. - // Broken currently. + if (utf8Encoder.encode(buffer, sendBuffer, true) == CoderResult.OVERFLOW) { throw new BufferOverflowException(); } - - return charBuffer; } boolean isShutdown() { diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index 45becb58..b7f5a873 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -6,7 +6,7 @@ import org.junit.Test; import org.junit.Rule; -import com.timgroup.statsd.StatsDSender.Message; +import com.timgroup.statsd.Message; import java.io.IOException; import java.net.SocketAddress;