Skip to content

Commit

Permalink
add checks and handling for exceeding max log size (#60)
Browse files Browse the repository at this point in the history
* add checks and handling for exceeding max log size

* updated dependencies
  • Loading branch information
tamir-michaeli authored Aug 2, 2022
1 parent 56859b4 commit 33a75a9
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 51 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ implementation 'io.logz.sender:logzio-java-sender:V1.1.2'
| **connectTimeout** | *10 * 1000* | The connection timeout during log shipment |
| **debug** | *false* | Print some debug messages to stdout to help to diagnose issues |
| **compressRequests** | *false* | Boolean. `true` if logs are compressed in gzip format before sending. `false` if logs are sent uncompressed. |
| **exceedMaxSizeAction** | `cut` | String. `cut` to truncate the message field or `drop` to drop log that exceed the allowed maximum size for logzio. If the log size exceeding the maximum size allowed after truncating the message field, the log will be dropped.|

#### Parameters for in-memory queue
| Parameter | Default | Explained |
Expand Down Expand Up @@ -94,9 +95,9 @@ public class LogzioSenderExample {
.endInMemoryQueue()
.build();

sender.start();
logzioSender.start();
JsonObject jsonMessage = createLogMessage(); // create JsonObject to send to logz.io
sender.send(jsonMessage);
logzioSender.send(jsonMessage);
}
}
```
Expand Down
5 changes: 3 additions & 2 deletions logzio-sender-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
14 changes: 11 additions & 3 deletions logzio-sender/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
Expand Down Expand Up @@ -60,6 +60,14 @@
</filters>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>

Expand All @@ -69,8 +77,8 @@
<artifactId>bigqueue</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
111 changes: 93 additions & 18 deletions logzio-sender/src/main/java/io/logz/sender/LogzioSender.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package io.logz.sender;

import com.google.common.hash.Hashing;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import io.logz.sender.exceptions.LogzioParameterErrorException;
import io.logz.sender.exceptions.LogzioServerErrorException;

import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -19,14 +18,19 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class LogzioSender {
public class LogzioSender {
private static final int MAX_SIZE_IN_BYTES = 3 * 1024 * 1024; // 3 MB

private static final int MAX_LOG_SIZE_IN_BYTES = 500000;
private static final int MAX_LOG_LINE_SIZE_IN_BYTES = 32700;
private static final String CUT_EXCEEDING_LOG = "cut";
private static final String DROP_EXCEEDING_LOG = "drop";
private static final String TRUNCATED_MESSAGE_SUFFIX = "...truncated";
private static final Map<AbstractMap.SimpleImmutableEntry<String, String>, LogzioSender> logzioSenderInstances = new HashMap<>();
private static final int FINAL_DRAIN_TIMEOUT_SEC = 20;

private final LogsQueue logsQueue;
private final int drainTimeout;
private final String exceedMaxSizeAction;
private final boolean debug;
private final SenderStatusReporter reporter;
private ScheduledExecutorService tasksExecutor;
Expand All @@ -35,14 +39,15 @@ public class LogzioSender {

private LogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int drainTimeout, boolean debug,
SenderStatusReporter reporter, ScheduledExecutorService tasksExecutor,
LogsQueue logsQueue) throws LogzioParameterErrorException {
LogsQueue logsQueue, String exceedMaxSizeAction) throws LogzioParameterErrorException {

if (logsQueue == null || reporter == null || httpsRequestConfiguration == null) {
throw new LogzioParameterErrorException("logsQueue=" + logsQueue + " reporter=" + reporter
+ " httpsRequestConfiguration=" + httpsRequestConfiguration ,
+ " httpsRequestConfiguration=" + httpsRequestConfiguration,
"For some reason could not initialize URL. Cant recover..");
}

this.exceedMaxSizeAction = validateAndGetExceedMaxSizeAction(exceedMaxSizeAction);
this.logsQueue = logsQueue;
this.drainTimeout = drainTimeout;
this.debug = debug;
Expand All @@ -52,13 +57,21 @@ private LogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int dr
debug("Created new LogzioSender class");
}

private String validateAndGetExceedMaxSizeAction(String exceedMaxSizeAction) throws LogzioParameterErrorException {
if (exceedMaxSizeAction != null && Arrays.asList("cut", "drop").contains(exceedMaxSizeAction.toLowerCase())) {
return exceedMaxSizeAction.toLowerCase();
}

throw new LogzioParameterErrorException("exceedMaxSizeAction=" + exceedMaxSizeAction, "invalid parameter value");
}

private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int drainTimeout, boolean debug, SenderStatusReporter reporter,
ScheduledExecutorService tasksExecutor, LogsQueue logsQueue)
ScheduledExecutorService tasksExecutor, LogsQueue logsQueue, String exceedMaxSizeAction)
throws LogzioParameterErrorException {
String tokenHash = Hashing.sha256()
.hashString(httpsRequestConfiguration.getLogzioToken(), StandardCharsets.UTF_8)
.toString()
.substring(0,7);
.substring(0, 7);
AbstractMap.SimpleImmutableEntry<String, String> tokenAndTypePair = new AbstractMap.SimpleImmutableEntry<>
(tokenHash, httpsRequestConfiguration.getLogzioType());
// We want one queue per logzio token and data type.
Expand All @@ -71,7 +84,7 @@ private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsReque
}

LogzioSender logzioSender = new LogzioSender(httpsRequestConfiguration, drainTimeout, debug, reporter,
tasksExecutor, logsQueue);
tasksExecutor, logsQueue, exceedMaxSizeAction);
logzioSenderInstances.put(tokenAndTypePair, logzioSender);
return logzioSender;
} else {
Expand Down Expand Up @@ -127,27 +140,81 @@ public void drainQueueAndSend() {
}

public void send(JsonObject jsonMessage) {
// Return the json, while separating lines with \n

// check for oversized message
int jsonByteLength = jsonMessage.toString().getBytes(StandardCharsets.UTF_8).length;
if (jsonByteLength > MAX_LOG_SIZE_IN_BYTES) {
String jsonMessageField = jsonMessage.get("message").getAsString();

// calculate the minimum between max log line size, and the message field after truncating the exceeding bytes
int truncatedMessageSize = Math.min(MAX_LOG_LINE_SIZE_IN_BYTES - TRUNCATED_MESSAGE_SUFFIX.length(),
(jsonMessageField.getBytes(StandardCharsets.UTF_8).length - (jsonByteLength - MAX_LOG_SIZE_IN_BYTES)) - TRUNCATED_MESSAGE_SUFFIX.length());

if (truncatedMessageSize <= 0 || exceedMaxSizeAction.equals(DROP_EXCEEDING_LOG)) {
debug(truncatedMessageSize <= 0 ? "Message field is empty after truncating, dropping log" : "Dropping oversized log");
return;
}

// truncate message field
String truncatedMessage = jsonMessageField.substring(0, truncatedMessageSize) + TRUNCATED_MESSAGE_SUFFIX;
jsonMessage.addProperty("message", truncatedMessage);
debug("Truncated oversized log");
}

logsQueue.enqueue(jsonMessage.toString().getBytes(StandardCharsets.UTF_8));
}


/**
* Send byte array to Logz.io
* This method is not the recommended method to use
* since it is up to the user to supply with a valid UTF8 json byte array
* representation. In any case the byte[] is not valid, the logs will not be sent.
* representation, and converting the byte array to JsonObject is necessary for log size validation.
* In any case the byte[] is not valid, the logs will not be sent.
*
* @param jsonStringAsUTF8ByteArray UTF8 byte array representation of a valid json object.
*/
public void send(byte[] jsonStringAsUTF8ByteArray) {
Gson gson = new Gson();
boolean dropLog = false;
try {
if (jsonStringAsUTF8ByteArray.length > MAX_LOG_SIZE_IN_BYTES) {
if (exceedMaxSizeAction.equals(CUT_EXCEEDING_LOG)) {
String jsonString = new String(jsonStringAsUTF8ByteArray, StandardCharsets.UTF_8);
JsonObject json = gson.fromJson(jsonString, JsonElement.class).getAsJsonObject();
String messageString = json.get("message").getAsString();
int truncatedMessageSize = Math.min(MAX_LOG_LINE_SIZE_IN_BYTES - TRUNCATED_MESSAGE_SUFFIX.length(),
(messageString.getBytes(StandardCharsets.UTF_8).length -
(jsonString.getBytes(StandardCharsets.UTF_8).length - MAX_LOG_SIZE_IN_BYTES)) - TRUNCATED_MESSAGE_SUFFIX.length());
if (truncatedMessageSize <= 0) {
dropLog = true;
} else {
String truncatedMessage = messageString.substring(0, truncatedMessageSize) + TRUNCATED_MESSAGE_SUFFIX;
json.addProperty("message", truncatedMessage);
jsonStringAsUTF8ByteArray = json.toString().getBytes(StandardCharsets.UTF_8);
debug("Truncated oversized log");
}
}
}
// Invalid json format, or truncating the message field was no
} catch (JsonSyntaxException | IndexOutOfBoundsException e) {
dropLog = true;
}

if (dropLog || exceedMaxSizeAction.equals(DROP_EXCEEDING_LOG)) {
debug(dropLog ? "Message field is empty after truncating, dropping log" : "Dropping oversized log");
return;
}
// Return the byte[], while separating lines with \n
logsQueue.enqueue(jsonStringAsUTF8ByteArray);

}

private List<FormattedLogMessage> dequeueUpToMaxBatchSize() {
List<FormattedLogMessage> logsList = new ArrayList<>();
int totalSize = 0;
while (!logsQueue.isEmpty()) {
byte[] message = logsQueue.dequeue();
byte[] message = logsQueue.dequeue();
if (message != null && message.length > 0) {
logsList.add(new FormattedLogMessage(message));
totalSize += message.length;
Expand Down Expand Up @@ -205,6 +272,12 @@ public static class Builder {
private InMemoryQueue.Builder inMemoryQueueBuilder;
private DiskQueue.Builder diskQueueBuilder;
private HttpsRequestConfiguration httpsRequestConfiguration;
private String exceedMaxSizeAction = "cut";

public Builder setExceedMaxSizeAction(String exceedMaxSizeAction) {
this.exceedMaxSizeAction = exceedMaxSizeAction;
return this;
}

public Builder setDrainTimeoutSec(int drainTimeoutSec) {
this.drainTimeoutSec = drainTimeoutSec;
Expand Down Expand Up @@ -255,13 +328,14 @@ void setInMemoryQueueBuilder(InMemoryQueue.Builder inMemoryQueueBuilder) {
}

public LogzioSender build() throws LogzioParameterErrorException {
return getLogzioSender(
return getLogzioSender(
httpsRequestConfiguration,
drainTimeoutSec,
debug,
reporter,
tasksExecutor,
getLogsQueue()
getLogsQueue(),
exceedMaxSizeAction
);
}

Expand All @@ -275,9 +349,10 @@ private LogsQueue getLogsQueue() throws LogzioParameterErrorException {
inMemoryQueueBuilder.setReporter(reporter);
return inMemoryQueueBuilder.build();
}

}

public static Builder builder(){
public static Builder builder() {
return new Builder();
}

Expand Down
9 changes: 6 additions & 3 deletions logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import io.logz.sender.LogzioSender.Builder;
import io.logz.sender.exceptions.LogzioParameterErrorException;
import io.logz.test.TestEnvironment;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.logz.sender.LogzioTestSenderUtil.createJsonMessage;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;


public class DiskQueueTest extends LogzioSenderTest {
private final static int FS_PERCENT_THRESHOLD = 98;
Expand Down
22 changes: 13 additions & 9 deletions logzio-sender/src/test/java/io/logz/sender/InMemoryQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.google.gson.JsonObject;
import io.logz.sender.LogzioSender.Builder;
import io.logz.sender.exceptions.LogzioParameterErrorException;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
Expand All @@ -16,8 +18,8 @@ public class InMemoryQueueTest extends LogzioSenderTest {

@Override
protected Builder getLogzioSenderBuilder(String token, String type, Integer drainTimeout,
Integer socketTimeout, Integer serverTimeout,
ScheduledExecutorService tasks, boolean compressRequests)
Integer socketTimeout, Integer serverTimeout,
ScheduledExecutorService tasks, boolean compressRequests)
throws LogzioParameterErrorException {

Builder logzioSenderBuilder = super.getLogzioSenderBuilder(token, type, drainTimeout,
Expand All @@ -35,14 +37,14 @@ protected void setZeroThresholdQueue(Builder logzioSenderBuilder) {
private void setCapacityInBytes(Builder logzioSenderBuilder, long capacityInBytes) {
logzioSenderBuilder
.withInMemoryQueue()
.setCapacityInBytes(capacityInBytes)
.setCapacityInBytes(capacityInBytes)
.endInMemoryQueue();
}

private void setLogsCountLimit(Builder logzioSenderBuilder, long logsCounterLimit) {
logzioSenderBuilder
.withInMemoryQueue()
.setLogsCountLimit(logsCounterLimit)
.setLogsCountLimit(logsCounterLimit)
.endInMemoryQueue();
}

Expand All @@ -60,14 +62,14 @@ public void checkCapacityReachedToSizeBelowCapacity() throws LogzioParameterErro
int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length;
ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3);

Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
10 * 1000, tasks, false);
setCapacityInBytes(testSenderBuilder, logSize * successfulLogs);

LogzioSender testSender = createLogzioSender(testSenderBuilder);

sleepSeconds(drainTimeout - 1);
for(int i = 0; i <= successfulLogs; i++) {
for (int i = 0; i <= successfulLogs; i++) {
testSender.send(log);
}

Expand All @@ -94,14 +96,14 @@ public void checkLogMessageCountLimitWithCapacityInBytes() throws LogzioParamete

ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3);

Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
10 * 1000, tasks, false);
setLogsCountLimit(testSenderBuilder, successfulLogs);

LogzioSender testSender = createLogzioSender(testSenderBuilder);

sleepSeconds(drainTimeout - 1);
for(int i = 0; i <= successfulLogs; i++) {
for (int i = 0; i <= successfulLogs; i++) {
testSender.send(log);
}

Expand All @@ -114,5 +116,7 @@ public void checkLogMessageCountLimitWithCapacityInBytes() throws LogzioParamete
mockListener.assertNumberOfReceivedMsgs(successfulLogs + 1);
tasks.shutdownNow();
}


}

Loading

0 comments on commit 33a75a9

Please sign in to comment.