Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logs a warn message on max order overshoot #493

Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 6.8.1
- Added logging the best Netty's `maxOrder` setting when big payload trigger an unpooled allocation [#493](https://github.com/logstash-plugins/logstash-input-beats/pull/493)

## 6.8.0
- Introduce expert only `event_loop_threads` to tune netty event loop threads count [#490](https://github.com/logstash-plugins/logstash-input-beats/pull/490)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.8.0
6.8.1
75 changes: 71 additions & 4 deletions src/main/java/org/logstash/beats/V2Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/**
* Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use.
*/
public class V2Batch implements Batch {

private final static Logger logger = LogManager.getLogger(V2Batch.class);
private static volatile int LAST_NOTIFIED_MAX_ORDER = -1;
// The value 14 comes from PooledByteBufAllocator.validateAndCalculateChunkSize
private static final int NETTY_MAXIMUM_ORDER = 14;

private ByteBuf internalBuffer = PooledByteBufAllocator.DEFAULT.buffer();
private int written = 0;
private int read = 0;
Expand Down Expand Up @@ -80,25 +90,82 @@ public int getHighestSequence(){

/**
* Adds a message to the batch, which will be constructed into an actual {@link Message} lazily.
* @param sequenceNumber sequence number of the message within the batch
* @param sequenceNumber sequence number of the message within the batch
* @param buffer A ByteBuf pointing to serialized JSon
* @param size size of the serialized Json
*/
void addMessage(int sequenceNumber, ByteBuf buffer, int size) {
written++;
if (internalBuffer.writableBytes() < size + (2 * SIZE_OF_INT)){
internalBuffer.capacity(internalBuffer.capacity() + size + (2 * SIZE_OF_INT));
if (internalBuffer.writableBytes() < size + (2 * SIZE_OF_INT)) {
int requiredSize = internalBuffer.capacity() + size + (2 * SIZE_OF_INT);
eventuallyLogIdealMaxOrder(requiredSize, logger);

internalBuffer.capacity(requiredSize);
}
internalBuffer.writeInt(sequenceNumber);
internalBuffer.writeInt(size);
buffer.readBytes(internalBuffer, size);
if (sequenceNumber > highestSequence){
if (sequenceNumber > highestSequence) {
highestSequence = sequenceNumber;
}
}

// package-private for testability reasons
void eventuallyLogIdealMaxOrder(int requiredSize, Logger logger) {
int idealMaxOrder = idealMaxOrder(requiredSize);
if (idealMaxOrder <= PooledByteBufAllocator.defaultMaxOrder()) {
return;
}
if (!needsToBeLogged(idealMaxOrder)) {
return;
}

if (idealMaxOrder > NETTY_MAXIMUM_ORDER) {
logger.error("Received batch of size {} bytes that is too large to fit into the pre-allocated memory pool. Reduce the size of the batch to improve performance and avoid data loss.", requiredSize);
} else {
logger.warn("Received batch of size {} bytes that is too large to fit into the pre-allocated memory pool. This will cause a performance degradation. Set 'io.netty.allocator.maxOrder' JVM property to {} to accommodate batches bigger than {} bytes.",
requiredSize, idealMaxOrder, PooledByteBufAllocator.DEFAULT.metric().chunkSize());
}
trackAsAlreadyLogged(idealMaxOrder);
}

private void trackAsAlreadyLogged(int maxOrder) {
LAST_NOTIFIED_MAX_ORDER = capMaxOrder(maxOrder);
}

private boolean needsToBeLogged(int maxOrder) {
return capMaxOrder(maxOrder) > LAST_NOTIFIED_MAX_ORDER;
}

private static int capMaxOrder(int maxOrder) {
return maxOrder > NETTY_MAXIMUM_ORDER ? Integer.MAX_VALUE : maxOrder;
}

/**
* Return the ideal maxOrder value to configure chunks of size where a buffer of requiredSize can fit.
* */
private int idealMaxOrder(int requiredSize) {
int chunkSize = PooledByteBufAllocator.DEFAULT.metric().chunkSize();
int defaultMaxOrder = PooledByteBufAllocator.defaultMaxOrder();
int defaultPageSize = PooledByteBufAllocator.defaultPageSize();
if (requiredSize > chunkSize) {
int nextMaxOrder = defaultMaxOrder;
do {
nextMaxOrder ++;
} while (requiredSize > (defaultPageSize << nextMaxOrder));
return nextMaxOrder;
} else {
return defaultMaxOrder;
}
}

@Override
public void release() {
internalBuffer.release();
}

// visible for testing
static void resetReportedOrders() {
LAST_NOTIFIED_MAX_ORDER = -1;
}
}
156 changes: 156 additions & 0 deletions src/test/java/org/logstash/beats/SpyLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package org.logstash.beats;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.util.MessageSupplier;
import org.apache.logging.log4j.util.Supplier;

import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

final class SpyLogger extends org.apache.logging.log4j.core.Logger {
// store last message received by the spy
private String loggedMessage;
// store last received level
private Level loggedLevel;

public SpyLogger(Logger sample) {
super(new LoggerContext("spylogger"), sample.getName(), sample.getMessageFactory());
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker,
final MessageSupplier messageSupplier, final Throwable throwable) {
loggedMessage = messageSupplier.get().toString();
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final Object message,
final Throwable throwable) {
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final Supplier<?> messageSupplier,
final Throwable throwable) {
loggedMessage = messageSupplier.get().toString();
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Supplier<?>... paramSuppliers) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object... params) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(String fqcn, Level level, Marker marker, String message, Object p0) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2, final Object p3, final Object p4, final Object p5) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2, final Object p3, final Object p4, final Object p5,
final Object p6) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2, final Object p3, final Object p4, final Object p5,
final Object p6, final Object p7) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2, final Object p3, final Object p4, final Object p5,
final Object p6, final Object p7, final Object p8) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2, final Object p3, final Object p4, final Object p5,
final Object p6, final Object p7, final Object p8, final Object p9) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Throwable throwable) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2, final Object p3, final Object p4) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2, final Object p3) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1, final Object p2) {
loggedMessage = message;
loggedLevel = level;
}

@Override
public void logIfEnabled(final String fqcn, final Level level, final Marker marker, final String message,
final Object p0, final Object p1) {
loggedMessage = message;
loggedLevel = level;
}

void verifyLogMessage(String assertionMessage, String expectedMessage) {
assertEquals(assertionMessage, expectedMessage, loggedMessage);
loggedMessage = null;
}

void verifyLevel(Level expectedLevel) {
assertEquals(expectedLevel, loggedLevel);
loggedLevel = null;
}

void verifyNoLog(String reason) {
assertThat(reason, loggedMessage, isEmptyOrNullString());
}
}
70 changes: 70 additions & 0 deletions src/test/java/org/logstash/beats/V2BatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.security.SecureRandom;
Expand All @@ -16,7 +22,19 @@
import static org.junit.Assert.assertTrue;

public class V2BatchTest {
private final static Logger logger = LogManager.getLogger(V2BatchTest.class);
public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
private SpyLogger loggerSpy;

@Before
public void setUp() {
loggerSpy = new SpyLogger(logger);
}

@After
public void tearDown() {
V2Batch.resetReportedOrders();
}

@Test
public void testIsEmpty() {
Expand Down Expand Up @@ -114,4 +132,56 @@ public static ByteBuf messageContents() {
throw new RuntimeException(e);
}
}

@Test
public void givenBufferSizeThatFitIntoActualMaxOrderThenNoLogLineIsPrinted() {
V2Batch sut = new V2Batch();

sut.eventuallyLogIdealMaxOrder(1024 * 1024, loggerSpy);

loggerSpy.verifyNoLog("No logging when the buffer size fit into actual maxOrder");
}

@Test
public void givenBufferSizeThatDoesntFitIntoActualMaxOrderThenLogLineIsPrintedJustOnce() {
V2Batch sut = new V2Batch();
int actualChunkSize = PooledByteBufAllocator.DEFAULT.metric().chunkSize();
sut.eventuallyLogIdealMaxOrder(actualChunkSize + 1024, loggerSpy);

loggerSpy.verifyLogMessage("First time the chunk size is passed a log line is printed",
"Received batch of size {} bytes that is too large to fit into the pre-allocated memory pool. This will cause a performance degradation. Set 'io.netty.allocator.maxOrder' JVM property to {} to accommodate batches bigger than {} bytes.");
loggerSpy.verifyLevel(Level.WARN);

sut.eventuallyLogIdealMaxOrder(actualChunkSize + 1024, loggerSpy);
loggerSpy.verifyNoLog("Second time the same chunk size is passed, no log happens");
}

@Test
public void givenBufferSizeBiggerThanMaximumNettyChunkSizeThenSpecificErrorLineIsLogged() {
V2Batch sut = new V2Batch();
int maxChunkSize = PooledByteBufAllocator.defaultPageSize() << 14;
sut.eventuallyLogIdealMaxOrder(maxChunkSize + 1024, loggerSpy);

loggerSpy.verifyLogMessage("Error message to be over the maximum Netty chunk size is printed",
"Received batch of size {} bytes that is too large to fit into the pre-allocated memory pool. Reduce the size of the batch to improve performance and avoid data loss.");
loggerSpy.verifyLevel(Level.ERROR);
}

@Test
public void givenWarningLogAlreadyPrintedForMaxOrderThenAnyOtherIdealMaxOrderMinorThanThatArentPrinted() {
// actual maxOrder is 8, the system has already reported an ideal maxOrder of 11, then it wouldn't report
// any other maxOrder in the range 9..11
V2Batch sut = new V2Batch();
int maxChunkSize = PooledByteBufAllocator.defaultPageSize() << 12;
sut.eventuallyLogIdealMaxOrder(maxChunkSize, loggerSpy);

loggerSpy.verifyLogMessage("First time the chunk size is passed a log line is printed",
"Received batch of size {} bytes that is too large to fit into the pre-allocated memory pool. This will cause a performance degradation. Set 'io.netty.allocator.maxOrder' JVM property to {} to accommodate batches bigger than {} bytes.");
loggerSpy.verifyLevel(Level.WARN);

maxChunkSize = PooledByteBufAllocator.defaultPageSize() << 10;
sut.eventuallyLogIdealMaxOrder(maxChunkSize, loggerSpy);

loggerSpy.verifyNoLog("MaxOrder lover then the one already reported aren't logged");
}
}