Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved abstractions + better object construction #96

Merged
merged 12 commits into from
Jan 29, 2020
656 changes: 24 additions & 632 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Large diffs are not rendered by default.

206 changes: 206 additions & 0 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package com.timgroup.statsd;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Callable;

import jnr.unixsocket.UnixSocketAddress;

public class NonBlockingStatsDClientBuilder {

/**
* 1400 chosen as default here so that the number of bytes in a message plus the number of bytes required
* for additional udp headers should be under the 1500 Maximum Transmission Unit for ethernet.
* See https://github.com/DataDog/java-dogstatsd-client/pull/17 for discussion.
*/

public int port = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT;
public int queueSize = NonBlockingStatsDClient.DEFAULT_QUEUE_SIZE;
public int timeout = NonBlockingStatsDClient.SOCKET_TIMEOUT_MS;
public int bufferPoolSize = NonBlockingStatsDClient.DEFAULT_POOL_SIZE;
public int socketBufferSize = NonBlockingStatsDClient.SOCKET_BUFFER_BYTES;
public int maxPacketSizeBytes = NonBlockingStatsDClient.DEFAULT_MAX_PACKET_SIZE_BYTES;
public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS;
public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS;
public boolean blocking;

public Callable<SocketAddress> addressLookup;

public String hostname;
public String prefix;
public String entityID;
public String[] constantTags;

public StatsDClientErrorHandler errorHandler;

public NonBlockingStatsDClientBuilder() { }

public NonBlockingStatsDClientBuilder port(int val) {
port = val;
return this;
}
public NonBlockingStatsDClientBuilder queueSize(int val) {
queueSize = val;
return this;
}
public NonBlockingStatsDClientBuilder timeout(int val) {
timeout = val;
return this;
}
public NonBlockingStatsDClientBuilder bufferPoolSize(int val) {
bufferPoolSize = val;
return this;
}
public NonBlockingStatsDClientBuilder socketBufferSize(int val) {
socketBufferSize = val;
return this;
}
public NonBlockingStatsDClientBuilder maxPacketSizeBytes(int val) {
maxPacketSizeBytes = val;
return this;
}
public NonBlockingStatsDClientBuilder processorWorkers(int val) {
processorWorkers = val;
return this;
}
public NonBlockingStatsDClientBuilder senderWorkers(int val) {
senderWorkers = val;
return this;
}
public NonBlockingStatsDClientBuilder blocking(boolean val) {
blocking = val;
return this;
}
public NonBlockingStatsDClientBuilder addressLookup(Callable<SocketAddress> val) {
addressLookup = val;
return this;
}
public NonBlockingStatsDClientBuilder hostname(String val) {
hostname = val;
return this;
}
public NonBlockingStatsDClientBuilder prefix(String val) {
prefix = val;
return this;
}
public NonBlockingStatsDClientBuilder entityID(String val) {
entityID = val;
return this;
}
public NonBlockingStatsDClientBuilder constantTags(String... val) {
constantTags = val;
return this;
}
public NonBlockingStatsDClientBuilder errorHandler(StatsDClientErrorHandler val) {
errorHandler = val;
return this;
}
public NonBlockingStatsDClient build() throws StatsDClientException {
if (addressLookup != null) {
return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID,
bufferPoolSize, processorWorkers, senderWorkers, blocking);
} else {
return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes,
entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking);
}
}

/**
* Create dynamic lookup for the given host name and port.
*
* @param hostname
* the host name of the targeted StatsD server. If the environment variable
* "DD_AGENT_HOST" is set, this parameter is overwritten by the environment
* variable value.
* @param port
* the port of the targeted StatsD server. If the environment variable
* "DD_DOGSTATSD_PORT" is set, this parameter is overwritten by the environment
* variable value.
* @return a function to perform the lookup
*/
public static Callable<SocketAddress> volatileAddressResolution(final String hostname, final int port) {
return new Callable<SocketAddress>() {
@Override public SocketAddress call() throws UnknownHostException {
if (port == 0) { // Hostname is a file path to the socket
return new UnixSocketAddress(hostname);
} else {
return new InetSocketAddress(InetAddress.getByName(hostname), port);
}
}
};
}

/**
* Lookup the address for the given host name and cache the result.
*
* @param hostname the host name of the targeted StatsD server
* @param port the port of the targeted StatsD server
* @return a function that cached the result of the lookup
* @throws Exception if the lookup fails, i.e. {@link UnknownHostException}
*/
public static Callable<SocketAddress> staticAddressResolution(final String hostname, final int port) throws Exception {
final SocketAddress address = volatileAddressResolution(hostname, port).call();
return new Callable<SocketAddress>() {
@Override public SocketAddress call() {
return address;
}
};
}

protected static Callable<SocketAddress> staticStatsDAddressResolution(String hostname, int port) throws StatsDClientException {
try {
if (hostname == null) {
hostname = getHostnameFromEnvVar();
port = getPortFromEnvVar(port);
}

return staticAddressResolution(hostname, port);
} catch (final Exception e) {
throw new StatsDClientException("Failed to lookup StatsD host", e);
}
}

/**
* Retrieves host name from the environment variable "DD_AGENT_HOST"
*
* @return host name from the environment variable "DD_AGENT_HOST"
*
* @throws StatsDClientException if the environment variable is not set
*/
private static String getHostnameFromEnvVar() {
final String hostname = System.getenv(NonBlockingStatsDClient.DD_AGENT_HOST_ENV_VAR);
if (hostname == null) {
throw new StatsDClientException("Failed to retrieve agent hostname from environment variable", null);
}
return hostname;
}

/**
* Retrieves dogstatsd port from the environment variable "DD_DOGSTATSD_PORT"
*
* @return dogstatsd port from the environment variable "DD_DOGSTATSD_PORT"
*
* @throws StatsDClientException if the environment variable is an integer
*/
private static int getPortFromEnvVar(final int defaultPort) {
final String statsDPortString = System.getenv(NonBlockingStatsDClient.DD_DOGSTATSD_PORT_ENV_VAR);
if (statsDPortString == null) {
return defaultPort;
} else {
try {
final int statsDPort = Integer.parseInt(statsDPortString);
return statsDPort;
} catch (final NumberFormatException e) {
throw new StatsDClientException("Failed to parse "
+ NonBlockingStatsDClient.DD_DOGSTATSD_PORT_ENV_VAR + "environment variable value", e);
}
}
}


}

107 changes: 107 additions & 0 deletions src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.timgroup.statsd;

import java.nio.ByteBuffer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class StatsDBlockingProcessor extends StatsDProcessor {

private final BlockingQueue<String> messages;

StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers)
throws Exception {

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers);
this.messages = new ArrayBlockingQueue<String>(queueSize);
}

@Override
boolean send(final String message) {
try {
if (!shutdown) {
messages.put(message);
return true;
}
} catch (InterruptedException e) {
}

return false;
}

@Override
public void run() {

for (int i=0 ; i<workers ; i++) {
executor.submit(new Runnable() {
public void run() {
boolean empty;
ByteBuffer sendBuffer;

try {
sendBuffer = bufferPool.borrow();
} catch(final InterruptedException e) {
handler.handle(e);
return;
}

while (!(messages.isEmpty() && shutdown)) {

try {

if (Thread.interrupted()) {
return;
}

final String message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS);
if (message != null) {
final byte[] data = message.getBytes(MESSAGE_CHARSET);
if (sendBuffer.capacity() < data.length) {
throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
}
if (sendBuffer.remaining() < (data.length + 1)) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
if (sendBuffer.position() > 0) {
sendBuffer.put((byte) '\n');
}
sendBuffer.put(data);
if (null == messages.peek()) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
}
} catch (final InterruptedException e) {
if (shutdown) {
endSignal.countDown();
return;
}
} catch (final Exception e) {
handler.handle(e);
}
}
endSignal.countDown();
}
});
}

boolean done = false;
while(!done) {
try {
endSignal.await();
done = true;
} catch (final InterruptedException e) { }
}
}

boolean isShutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: As shutdown is defined in StatsDProcessor, isShutdown() and shutdown() could be moved to StatsDProcessor. Same comment could also be applied for StatsDNonBlockingProcessor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, code was already defined in StatsdProcessor just forgot doing this cleanup. Thank you! Great catch again.

return shutdown;
}

void shutdown() {
shutdown = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

/**
* Describes a handler capable of processing exceptions that occur during StatsD client operations.
*
*
* @author Tom Denley
*
*/
public interface StatsDClientErrorHandler {

/**
* Handle the given exception, which occurred during a StatsD client operation.
*
*
* Should normally be implemented as a synchronized method.
*
* @param exception
* the {@link Exception} that occurred
*/
Expand Down
Loading