Skip to content

Commit

Permalink
cleanup, refactor, bad rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
truthbk committed Mar 10, 2020
1 parent 6cf2806 commit c99049b
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 183 deletions.
50 changes: 32 additions & 18 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -520,24 +520,6 @@ public void recordGaugeValue(final String aspect, final double value, final doub
send(aspect, value, "g", sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void recordGaugeValue(final String aspect, final double value, final double sampleRate, final String... tags) {
if (isInvalidSample(sampleRate)) {
return;
}
send(new StringBuilder(prefix)
.append(aspect)
.append(":")
.append(NUMBER_FORMATTERS.get().format(value))
.append("|g|@")
.append(SAMPLE_RATE_FORMATTERS.get().format(sampleRate))
.append(tagString(tags))
.toString());
}

/**
* Records the latest fixed value for the specified named gauge.
*
Expand Down Expand Up @@ -703,6 +685,22 @@ public void histogram(final String aspect, final long value, final double sample
recordHistogramValue(aspect, value, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void histogram(String aspect, double value, String... tags){
recordHistogramValue(aspect, value, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void histogram(String aspect, double value, double sampleRate, String... tags){
recordHistogramValue(aspect, value, sampleRate, tags);
}

/**
* Records a value for the specified named distribution.
*
Expand Down Expand Up @@ -773,6 +771,22 @@ public void distribution(final String aspect, final long value, final double sam
recordDistributionValue(aspect, value, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, double value, String... tags){
recordDistributionValue(aspect, value, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void distribution(String aspect, double value, double sampleRate, String... tags){
recordDistributionValue(aspect, value, sampleRate, tags);
}

private StringBuilder eventMap(final Event event, StringBuilder res) {
final long millisSinceEpoch = event.getMillisSinceEpoch();
if (millisSinceEpoch != -1) {
Expand Down
161 changes: 85 additions & 76 deletions src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.timgroup.statsd.Message;

import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;

import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -12,16 +13,97 @@ public class StatsDBlockingProcessor extends StatsDProcessor {

private final BlockingQueue<Message> messages;

private class ProcessingTask implements Runnable {
private final int id;

ProcessingTask(int id) {
this.id = id;
}

@Override
public void run() {
boolean empty;
ByteBuffer sendBuffer;
StringBuilder builder = builders.get(id);

try {
sendBuffer = bufferPool.borrow();
} catch (final InterruptedException e) {
handler.handle(e);
return;
}

while (!(messages.isEmpty() && shutdown)) {

try {

if (Thread.interrupted()) {
return;
}

final Message message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS);
// TODO: revisit this logic - cleanup, remove duplicate code.
if (message != null) {

builder.setLength(0);

message.writeTo(builder);
int lowerBoundSize = builder.length();

// if (sendBuffer.capacity() < data.length) {
// throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
// }

if (sendBuffer.remaining() < (lowerBoundSize + 1)) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}

sendBuffer.mark();
if (sendBuffer.position() > 0) {
sendBuffer.put((byte) '\n');
}

try {
writeBuilderToSendBuffer(id, builder, sendBuffer);
} catch (BufferOverflowException boe) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
writeBuilderToSendBuffer(id, builder, sendBuffer);
}

// TODO: revisit this logic
if (null == messages.peek()) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
}
} catch (final InterruptedException e) {
if (shutdown) {
endSignal.countDown();
return;
}
} catch (final Exception e) {
handler.handle(e);
}
}

builder.setLength(0);
builder.trimToSize();
endSignal.countDown();
}
}

StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers)
throws Exception {

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers);
this.messages = new ArrayBlockingQueue<String>(queueSize);
this.messages = new ArrayBlockingQueue<>(queueSize);
}

@Override
boolean send(final Message message);
boolean send(final Message message){
try {
if (!shutdown) {
messages.put(message);
Expand All @@ -38,80 +120,7 @@ public class StatsDBlockingProcessor extends StatsDProcessor {
public void run() {

for (int i = 0 ; i < workers ; i++) {
executor.submit(new Runnable() {
public void run() {
boolean empty;
ByteBuffer sendBuffer;
StringBuilder builder = new StringBuilder();
CharBuffer charBuffer = CharBuffer.wrap(builder);

try {
sendBuffer = bufferPool.borrow();
} catch (final InterruptedException e) {
handler.handle(e);
return;
}

while (!(messages.isEmpty() && shutdown)) {

try {

if (Thread.interrupted()) {
return;
}

final Message message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS);
// TODO: revisit this logic - cleanup, remove duplicate code.
if (message != null) {

builder.setLength(0);

message.writeTo(builder);
int lowerBoundSize = builder.length();

// if (sendBuffer.capacity() < data.length) {
// throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
// }

if (sendBuffer.remaining() < (lowerBoundSize + 1)) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}

sendBuffer.mark();
if (sendBuffer.position() > 0) {
sendBuffer.put((byte) '\n');
}

try {
charBuffer = writeBuilderToSendBuffer(builder, charBuffer, sendBuffer);
} catch (BufferOverflowException boe) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
charBuffer = writeBuilderToSendBuffer(builder, charBuffer, sendBuffer);
}

// TODO: revisit this logic
if (null == messages.peek()) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
}
} catch (final InterruptedException e) {
if (shutdown) {
endSignal.countDown();
return;
}
} catch (final Exception e) {
handler.handle(e);
}
}

builder.setLength(0);
builder.trimToSize();
endSignal.countDown();
}
});
executor.submit(new ProcessingTask(i));
}

boolean done = false;
Expand Down
Loading

0 comments on commit c99049b

Please sign in to comment.