Skip to content

Commit

Permalink
Malformed lines bug (#47)
Browse files Browse the repository at this point in the history
* cleanup

* fix travis jdk

* make sendToLogzio readable better

* adding a test

* changing some param names

* clean imports

* fix naming
  • Loading branch information
idohalevi authored Sep 3, 2019
1 parent c890a94 commit 5e2f09f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: java
jdk: oraclejdk8
jdk: openjdk8

cache:
directories:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class MockLogzioBulkListener implements Closeable {
private Queue<LogRequest> logRequests = new ConcurrentLinkedQueue<>();
private final String host;
private final int port;
private int malformedLogs = 0;

private boolean isServerTimeoutMode = false;
private boolean raiseExceptionOnLog = false;
Expand Down Expand Up @@ -86,8 +87,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
logger.debug("got log: {} ", line);
});
logger.debug("Total number of logRequests {} ({})", logRequests.size(), logRequests);

// Tell Jetty we are ok, and it should return 200
} catch (IllegalArgumentException e) {
malformedLogs++;
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
} finally {
baseRequest.setHandled(true);
}
}
Expand Down Expand Up @@ -264,6 +267,10 @@ public int getNumberOfReceivedLogs() {
return logRequests.size();
}

public int getNumberOfReceivedMalformedLogs() {
return malformedLogs;
}

public MockLogzioBulkListener.LogRequest assertLogReceivedByMessage(String message) {
Optional<MockLogzioBulkListener.LogRequest> logRequest = getLogByMessageField(message);
assertThat(logRequest.isPresent()).describedAs("Log with message '"+message+"' received").isTrue();
Expand All @@ -276,6 +283,12 @@ public void assertNumberOfReceivedMsgs(int count) {
.isEqualTo(count);
}

public void assertNumberOfReceivedMalformedMsgs(int count) {
assertThat(getNumberOfReceivedMalformedLogs())
.describedAs("Malformed messages on mock listener: {}", malformedLogs)
.isEqualTo(count);
}

public void assertLogReceivedIs(String message, String token, String type, String loggerName, String level) {
MockLogzioBulkListener.LogRequest log = assertLogReceivedByMessage(message);
assertLogReceivedIs(log, token, type, loggerName, level);
Expand Down
150 changes: 78 additions & 72 deletions logzio-sender/src/main/java/io/logz/sender/HttpsSyncSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class HttpsSyncSender {
private static final int NEW_LINE_AS_UTF8_BYTE_ARRAY_SIZE = NEW_LINE_AS_UTF8_BYTE_ARRAY.length;


public HttpsSyncSender(HttpsRequestConfiguration configuration, SenderStatusReporter reporter) {
HttpsSyncSender(HttpsRequestConfiguration configuration, SenderStatusReporter reporter) {
this.configuration = configuration;
this.reporter = reporter;
}
Expand All @@ -29,18 +29,6 @@ public HttpsRequestConfiguration getConfiguration() {
return configuration;
}

private boolean shouldRetry(int statusCode) {
boolean shouldRetry = true;
switch (statusCode) {
case HttpURLConnection.HTTP_OK:
case HttpURLConnection.HTTP_BAD_REQUEST:
case HttpURLConnection.HTTP_UNAUTHORIZED:
shouldRetry = false;
break;
}
return shouldRetry;
}

private byte[] toNewLineSeparatedByteArray(List<FormattedLogMessage> messages) {
try (ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(sizeInBytes(messages) + NEW_LINE_AS_UTF8_BYTE_ARRAY_SIZE * messages.size());
OutputStream os = configuration.isCompressRequests() ? new GZIPOutputStream(byteOutputStream) : byteOutputStream) {
Expand All @@ -63,91 +51,109 @@ private int sizeInBytes(List<FormattedLogMessage> logMessages) {
return totalSize;
}

public void sendToLogzio(List<FormattedLogMessage> messages) throws LogzioServerErrorException {
void sendToLogzio(List<FormattedLogMessage> messages) throws LogzioServerErrorException {
try {
byte[] payload = toNewLineSeparatedByteArray(messages);
int currentRetrySleep = configuration.getInitialWaitBeforeRetryMS();

for (int currTry = 1; currTry <= configuration.getMaxRetriesAttempts(); currTry++) {

boolean shouldRetry = true;
boolean retry = true;
int responseCode = 0;
String responseMessage = "";
IOException savedException = null;

try {
HttpURLConnection conn = (HttpURLConnection) configuration.getLogzioListenerUrl().openConnection();
conn.setRequestMethod(configuration.getRequestMethod());
conn.setRequestProperty("Content-length", String.valueOf(payload.length));
conn.setRequestProperty("Content-Type", "text/plain");
if (configuration.isCompressRequests()){
conn.setRequestProperty("Content-Encoding", "gzip");
}
conn.setReadTimeout(configuration.getSocketTimeout());
conn.setConnectTimeout(configuration.getConnectTimeout());
conn.setDoOutput(true);
conn.setDoInput(true);

conn.getOutputStream().write(payload);

HttpURLConnection conn = sendRequest(payload);
responseCode = conn.getResponseCode();
responseMessage = conn.getResponseMessage();

if (responseCode == HttpURLConnection.HTTP_BAD_REQUEST) {
BufferedReader bufferedReader = null;
try {
StringBuilder problemDescription = new StringBuilder();
InputStream errorStream = conn.getErrorStream();
if (errorStream != null) {
bufferedReader = new BufferedReader(new InputStreamReader((errorStream)));
bufferedReader.lines().forEach(line -> problemDescription.append("\n").append(line));
reporter.warning(String.format("Got 400 from logzio, here is the output: %s", problemDescription));
}
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch(Exception e) {}
}
}
}
if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
reporter.error("Logz.io: Got forbidden! Your token is not right. Unfortunately, dropping logs. Message: " + responseMessage);
}

shouldRetry = shouldRetry(responseCode);
retry = handleResponse(payload, responseCode, responseMessage, conn);
} catch (IOException e) {
savedException = e;
reporter.error("Got IO exception - " + e.getMessage());
}

if (!shouldRetry && responseCode == HttpURLConnection.HTTP_OK) {
reporter.info("Successfully sent bulk to logz.io, size: " + payload.length);
break;

if (retry) {
currentRetrySleep = handleRetry(currentRetrySleep, currTry, responseCode, responseMessage, savedException);
} else {
break;
}
}
} catch (InterruptedException e) {
reporter.info("Got interrupted exception");
Thread.currentThread().interrupt();
}
}

if (currTry == configuration.getMaxRetriesAttempts()){
private int handleRetry(int currentRetrySleep, int currTry, int responseCode, String responseMessage, IOException savedException) throws LogzioServerErrorException, InterruptedException {
if (currTry == configuration.getMaxRetriesAttempts()) {
if (savedException != null) {
reporter.error("Got IO exception on the last bulk try to logz.io", savedException);
}
// Giving up, something is broken on Logz.io side, we will try again later
throw new LogzioServerErrorException("Got HTTP " + responseCode + " code from logz.io, with message: " + responseMessage);
}

if (savedException != null) {
reporter.warning("Could not send log to logz.io, retry (" + currTry + "/" + configuration.getMaxRetriesAttempts() + ")");
reporter.warning("Sleeping for " + currentRetrySleep + " ms and will try again.");
Thread.sleep(currentRetrySleep);
return currentRetrySleep * 2;
}

reporter.error("Got IO exception on the last bulk try to logz.io", savedException);
}
// Giving up, something is broken on Logz.io side, we will try again later
throw new LogzioServerErrorException("Got HTTP " + responseCode + " code from logz.io, with message: " + responseMessage);
}
private boolean handleResponse(byte[] payload, int responseCode, String responseMessage, HttpURLConnection conn) {
boolean retry = false;
if (responseCode == HttpURLConnection.HTTP_BAD_REQUEST) {
String errorMessage = readErrorStream(conn);
if (errorMessage != null) {
reporter.warning(errorMessage);
}
}
else if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
reporter.error("Logz.io: Got forbidden! Your token is not right. Unfortunately, dropping logs. Message: " + responseMessage);
}
else if (responseCode == HttpURLConnection.HTTP_OK) {
reporter.info("Successfully sent bulk to logz.io, size: " + payload.length);
} else {
retry = true;
}
return retry;
}

reporter.warning("Could not send log to logz.io, retry (" + currTry + "/" + configuration.getMaxRetriesAttempts()+ ")");
reporter.warning("Sleeping for " + currentRetrySleep + " ms and will try again.");
Thread.sleep(currentRetrySleep);
currentRetrySleep *= 2;
private String readErrorStream(HttpURLConnection conn) {
BufferedReader bufferedReader = null;
try {
StringBuilder problemDescription = new StringBuilder();
InputStream errorStream = conn.getErrorStream();
if (errorStream != null) {
bufferedReader = new BufferedReader(new InputStreamReader((errorStream)));
bufferedReader.lines().forEach(line -> problemDescription.append("\n").append(line));
return String.format("Got 400 from logzio, here is the output: %s", problemDescription);
}
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (Exception ignored) {
}
}
}
return null;
}

} catch (InterruptedException e) {
reporter.info("Got interrupted exception");
Thread.currentThread().interrupt();
private HttpURLConnection sendRequest(byte[] payload) throws IOException {
HttpURLConnection conn = (HttpURLConnection) configuration.getLogzioListenerUrl().openConnection();
conn.setRequestMethod(configuration.getRequestMethod());
conn.setRequestProperty("Content-length", String.valueOf(payload.length));
conn.setRequestProperty("Content-Type", "text/plain");
if (configuration.isCompressRequests()) {
conn.setRequestProperty("Content-Encoding", "gzip");
}
conn.setReadTimeout(configuration.getSocketTimeout());
conn.setConnectTimeout(configuration.getConnectTimeout());
conn.setDoOutput(true);
conn.setDoInput(true);

conn.getOutputStream().write(payload);
return conn;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ private void drainQueue() {
List<FormattedLogMessage> logsList = dequeueUpToMaxBatchSize();
try {
httpsSyncSender.sendToLogzio(logsList);

} catch (LogzioServerErrorException e) {
debug("Could not send log to logz.io: ", e);
debug("Will retry in the next interval");
Expand Down
25 changes: 25 additions & 0 deletions logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ public void simpleAppending() throws Exception {
mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL);
}

@Test
public void malformedBulk() throws Exception {
String token = "aBcDeFgHiJkLmNoPqRsT";
String type = random(8);
String loggerName = "malformedBulk";
int drainTimeout = 1;

String message1 = "Testing.." + random(5);
String message2 = "Warning test.." + random(5);

LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout,
10 * 1000, 10 * 1000, tasks,false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);

testSender.send(createJsonMessage(loggerName, message1));
testSender.send(createJsonMessage(loggerName, message2));
testSender.send("bug".getBytes(StandardCharsets.UTF_8));
sleepSeconds(drainTimeout * 5);

mockListener.assertNumberOfReceivedMsgs(2);
mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL);
mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL);
mockListener.assertNumberOfReceivedMalformedMsgs(1);
}

@Test
public void simpleByteArrayAppending() throws Exception {
String token = "aBcDeFgHiJkLmNoPqRsT";
Expand Down

0 comments on commit 5e2f09f

Please sign in to comment.