diff --git a/CHANGELOG.md b/CHANGELOG.md index 91df1e32..818683b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 6.1.4 + - Fix: reduce error logging on connection resets [#424](https://github.com/logstash-plugins/logstash-input-beats/pull/424) + ## 6.1.3 - Fix: safe-guard byte buf allocation [#420](https://github.com/logstash-plugins/logstash-input-beats/pull/420) - Updated Jackson dependencies diff --git a/VERSION b/VERSION index 88d06f10..c0f25bae 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -6.1.3 +6.1.4 diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index 16564222..9e96bcf3 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -2,10 +2,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.util.AttributeKey; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.net.InetSocketAddress; import javax.net.ssl.SSLHandshakeException; @@ -40,7 +40,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override - public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception { + public void channelRead0(ChannelHandlerContext ctx, Batch batch) { if(logger.isDebugEnabled()) { logger.debug(format("Received a new payload")); } @@ -81,19 +81,37 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (!(cause instanceof SSLHandshakeException)) { messageListener.onException(ctx, cause); } - final Throwable realCause = extractCause(cause, 0); - if (logger.isDebugEnabled()){ - logger.debug(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause); + if (isNoisyException(cause)) { + if (logger.isDebugEnabled()) { + logger.info(format("closing"), cause); + } else { + logger.info(format("closing (" + cause.getMessage() + ")")); + } } else { - logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")")); + final Throwable realCause = extractCause(cause, 0); + if (logger.isDebugEnabled()){ + logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause); + } else { + logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")")); + } + super.exceptionCaught(ctx, cause); } - } finally{ - super.exceptionCaught(ctx, cause); + } finally { ctx.flush(); ctx.close(); } } + private boolean isNoisyException(final Throwable ex) { + if (ex instanceof IOException) { + final String message = ex.getMessage(); + if ("Connection reset by peer".equals(message)) { + return true; + } + } + return false; + } + private boolean needAck(Message message) { return message.getSequence() == message.getBatch().getHighestSequence(); }