Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Malformed lines bug #47

Merged
merged 7 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: java
jdk: oraclejdk8
jdk: openjdk8

cache:
directories:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class MockLogzioBulkListener implements Closeable {
private Queue<LogRequest> logRequests = new ConcurrentLinkedQueue<>();
private final String host;
private final int port;
private int malformedLogs = 0;

private boolean isServerTimeoutMode = false;
private boolean raiseExceptionOnLog = false;
Expand Down Expand Up @@ -86,8 +87,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
logger.debug("got log: {} ", line);
});
logger.debug("Total number of logRequests {} ({})", logRequests.size(), logRequests);

// Tell Jetty we are ok, and it should return 200
} catch (IllegalArgumentException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a Jackson exception here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? I'm catching exceptions from LogRequest function

malformedLogs++;
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
} finally {
baseRequest.setHandled(true);
}
}
Expand Down Expand Up @@ -264,6 +267,10 @@ public int getNumberOfReceivedLogs() {
return logRequests.size();
}

public int getNumberOfReceivedMalformedLogs() {
return malformedLogs;
}

public MockLogzioBulkListener.LogRequest assertLogReceivedByMessage(String message) {
Optional<MockLogzioBulkListener.LogRequest> logRequest = getLogByMessageField(message);
assertThat(logRequest.isPresent()).describedAs("Log with message '"+message+"' received").isTrue();
Expand All @@ -276,6 +283,12 @@ public void assertNumberOfReceivedMsgs(int count) {
.isEqualTo(count);
}

public void assertNumberOfReceivedMalformedMsgs(int count) {
assertThat(getNumberOfReceivedMalformedLogs())
.describedAs("Malformed messages on mock listener: {}", malformedLogs)
.isEqualTo(count);
}

public void assertLogReceivedIs(String message, String token, String type, String loggerName, String level) {
MockLogzioBulkListener.LogRequest log = assertLogReceivedByMessage(message);
assertLogReceivedIs(log, token, type, loggerName, level);
Expand Down
150 changes: 78 additions & 72 deletions logzio-sender/src/main/java/io/logz/sender/HttpsSyncSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class HttpsSyncSender {
private static final int NEW_LINE_AS_UTF8_BYTE_ARRAY_SIZE = NEW_LINE_AS_UTF8_BYTE_ARRAY.length;


public HttpsSyncSender(HttpsRequestConfiguration configuration, SenderStatusReporter reporter) {
HttpsSyncSender(HttpsRequestConfiguration configuration, SenderStatusReporter reporter) {
this.configuration = configuration;
this.reporter = reporter;
}
Expand All @@ -29,18 +29,6 @@ public HttpsRequestConfiguration getConfiguration() {
return configuration;
}

private boolean shouldRetry(int statusCode) {
boolean shouldRetry = true;
switch (statusCode) {
case HttpURLConnection.HTTP_OK:
case HttpURLConnection.HTTP_BAD_REQUEST:
case HttpURLConnection.HTTP_UNAUTHORIZED:
shouldRetry = false;
break;
}
return shouldRetry;
}

private byte[] toNewLineSeparatedByteArray(List<FormattedLogMessage> messages) {
try (ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(sizeInBytes(messages) + NEW_LINE_AS_UTF8_BYTE_ARRAY_SIZE * messages.size());
OutputStream os = configuration.isCompressRequests() ? new GZIPOutputStream(byteOutputStream) : byteOutputStream) {
Expand All @@ -63,91 +51,109 @@ private int sizeInBytes(List<FormattedLogMessage> logMessages) {
return totalSize;
}

public void sendToLogzio(List<FormattedLogMessage> messages) throws LogzioServerErrorException {
void sendToLogzio(List<FormattedLogMessage> messages) throws LogzioServerErrorException {
try {
byte[] payload = toNewLineSeparatedByteArray(messages);
int currentRetrySleep = configuration.getInitialWaitBeforeRetryMS();

for (int currTry = 1; currTry <= configuration.getMaxRetriesAttempts(); currTry++) {

boolean shouldRetry = true;
boolean retry = true;
int responseCode = 0;
String responseMessage = "";
IOException savedException = null;

try {
HttpURLConnection conn = (HttpURLConnection) configuration.getLogzioListenerUrl().openConnection();
conn.setRequestMethod(configuration.getRequestMethod());
conn.setRequestProperty("Content-length", String.valueOf(payload.length));
conn.setRequestProperty("Content-Type", "text/plain");
if (configuration.isCompressRequests()){
conn.setRequestProperty("Content-Encoding", "gzip");
}
conn.setReadTimeout(configuration.getSocketTimeout());
conn.setConnectTimeout(configuration.getConnectTimeout());
conn.setDoOutput(true);
conn.setDoInput(true);

conn.getOutputStream().write(payload);

HttpURLConnection conn = sendRequest(payload);
responseCode = conn.getResponseCode();
responseMessage = conn.getResponseMessage();

if (responseCode == HttpURLConnection.HTTP_BAD_REQUEST) {
BufferedReader bufferedReader = null;
try {
StringBuilder problemDescription = new StringBuilder();
InputStream errorStream = conn.getErrorStream();
if (errorStream != null) {
bufferedReader = new BufferedReader(new InputStreamReader((errorStream)));
bufferedReader.lines().forEach(line -> problemDescription.append("\n").append(line));
reporter.warning(String.format("Got 400 from logzio, here is the output: %s", problemDescription));
}
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch(Exception e) {}
}
}
}
if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
reporter.error("Logz.io: Got forbidden! Your token is not right. Unfortunately, dropping logs. Message: " + responseMessage);
}

shouldRetry = shouldRetry(responseCode);
retry = handleResponse(payload, responseCode, responseMessage, conn);
} catch (IOException e) {
savedException = e;
reporter.error("Got IO exception - " + e.getMessage());
}

if (!shouldRetry && responseCode == HttpURLConnection.HTTP_OK) {
reporter.info("Successfully sent bulk to logz.io, size: " + payload.length);
break;

if (retry) {
currentRetrySleep = handleRetry(currentRetrySleep, currTry, responseCode, responseMessage, savedException);
} else {
break;
}
}
} catch (InterruptedException e) {
reporter.info("Got interrupted exception");
Thread.currentThread().interrupt();
}
}

if (currTry == configuration.getMaxRetriesAttempts()){
private int handleRetry(int currentRetrySleep, int currTry, int responseCode, String responseMessage, IOException savedException) throws LogzioServerErrorException, InterruptedException {
if (currTry == configuration.getMaxRetriesAttempts()) {
if (savedException != null) {
reporter.error("Got IO exception on the last bulk try to logz.io", savedException);
}
// Giving up, something is broken on Logz.io side, we will try again later
throw new LogzioServerErrorException("Got HTTP " + responseCode + " code from logz.io, with message: " + responseMessage);
}

if (savedException != null) {
reporter.warning("Could not send log to logz.io, retry (" + currTry + "/" + configuration.getMaxRetriesAttempts() + ")");
reporter.warning("Sleeping for " + currentRetrySleep + " ms and will try again.");
Thread.sleep(currentRetrySleep);
return currentRetrySleep * 2;
}

reporter.error("Got IO exception on the last bulk try to logz.io", savedException);
}
// Giving up, something is broken on Logz.io side, we will try again later
throw new LogzioServerErrorException("Got HTTP " + responseCode + " code from logz.io, with message: " + responseMessage);
}
private boolean handleResponse(byte[] payload, int responseCode, String responseMessage, HttpURLConnection conn) {
boolean retry = false;
if (responseCode == HttpURLConnection.HTTP_BAD_REQUEST) {
String errorMessage = readErrorStream(conn);
if (errorMessage != null) {
reporter.warning(errorMessage);
}
}
else if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
reporter.error("Logz.io: Got forbidden! Your token is not right. Unfortunately, dropping logs. Message: " + responseMessage);
}
else if (responseCode == HttpURLConnection.HTTP_OK) {
reporter.info("Successfully sent bulk to logz.io, size: " + payload.length);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps retry only if response code == 5xx?

retry = true;
}
return retry;
}

reporter.warning("Could not send log to logz.io, retry (" + currTry + "/" + configuration.getMaxRetriesAttempts()+ ")");
reporter.warning("Sleeping for " + currentRetrySleep + " ms and will try again.");
Thread.sleep(currentRetrySleep);
currentRetrySleep *= 2;
private String readErrorStream(HttpURLConnection conn) {
BufferedReader bufferedReader = null;
try {
StringBuilder problemDescription = new StringBuilder();
InputStream errorStream = conn.getErrorStream();
if (errorStream != null) {
bufferedReader = new BufferedReader(new InputStreamReader((errorStream)));
bufferedReader.lines().forEach(line -> problemDescription.append("\n").append(line));
return String.format("Got 400 from logzio, here is the output: %s", problemDescription);
}
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (Exception ignored) {
}
}
}
return null;
}

} catch (InterruptedException e) {
reporter.info("Got interrupted exception");
Thread.currentThread().interrupt();
private HttpURLConnection sendRequest(byte[] payload) throws IOException {
HttpURLConnection conn = (HttpURLConnection) configuration.getLogzioListenerUrl().openConnection();
conn.setRequestMethod(configuration.getRequestMethod());
conn.setRequestProperty("Content-length", String.valueOf(payload.length));
conn.setRequestProperty("Content-Type", "text/plain");
if (configuration.isCompressRequests()) {
conn.setRequestProperty("Content-Encoding", "gzip");
}
conn.setReadTimeout(configuration.getSocketTimeout());
conn.setConnectTimeout(configuration.getConnectTimeout());
conn.setDoOutput(true);
conn.setDoInput(true);

conn.getOutputStream().write(payload);
return conn;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ private void drainQueue() {
List<FormattedLogMessage> logsList = dequeueUpToMaxBatchSize();
try {
httpsSyncSender.sendToLogzio(logsList);

} catch (LogzioServerErrorException e) {
debug("Could not send log to logz.io: ", e);
debug("Will retry in the next interval");
Expand Down
25 changes: 25 additions & 0 deletions logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ public void simpleAppending() throws Exception {
mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL);
}

@Test
public void malformedBulk() throws Exception {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed before the change and now it passes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a new test

String token = "aBcDeFgHiJkLmNoPqRsT";
String type = random(8);
String loggerName = "malformedBulk";
int drainTimeout = 1;

String message1 = "Testing.." + random(5);
String message2 = "Warning test.." + random(5);

LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout,
10 * 1000, 10 * 1000, tasks,false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);

testSender.send(createJsonMessage(loggerName, message1));
testSender.send(createJsonMessage(loggerName, message2));
testSender.send("bug".getBytes(StandardCharsets.UTF_8));
sleepSeconds(drainTimeout * 5);

mockListener.assertNumberOfReceivedMsgs(2);
mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL);
mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL);
mockListener.assertNumberOfReceivedMalformedMsgs(1);
}

@Test
public void simpleByteArrayAppending() throws Exception {
String token = "aBcDeFgHiJkLmNoPqRsT";
Expand Down