From 819b34d488e93b5acc14c61598c9db72663fd901 Mon Sep 17 00:00:00 2001 From: jesrypandawa <78069094+jesrypandawa@users.noreply.github.com> Date: Wed, 6 Jul 2022 22:31:28 +0700 Subject: [PATCH] fix: fix typo and handle json messages on log dlq writer (#180) * fix: fix typo and handle json messages on log dlq writer * test: add logDlqwriter unit test --- .../consumer/FirehoseConsumerFactory.java | 4 +- .../firehose/sink/dlq/log/LogDlqWriter.java | 4 +- .../firehose/sink/dlq/LogDlqWriterTest.java | 108 ++++++++++++++++++ 3 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java diff --git a/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java b/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java index 1eade3ec8..d17e04616 100644 --- a/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java +++ b/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java @@ -161,8 +161,8 @@ private Sink createSink(Tracer tracer, SinkFactory sinkFactory) { Sink baseSink = sinkFactory.getSink(); Sink sinkWithFailHandler = new SinkWithFailHandler(baseSink, errorHandler); Sink sinkWithRetry = withRetry(sinkWithFailHandler, errorHandler); - Sink sinWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler); - return new SinkFinal(sinWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class)); + Sink sinkWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler); + return new SinkFinal(sinkWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class)); } public Sink withDlq(Sink sink, Tracer tracer, ErrorHandler errorHandler) { diff --git a/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java b/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java index 595c71725..3923b5ad6 100644 --- a/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java +++ b/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java @@ -20,8 +20,8 @@ public LogDlqWriter(FirehoseInstrumentation firehoseInstrumentation) { @Override public List write(List messages) throws IOException { for (Message message : messages) { - String key = new String(message.getLogKey()); - String value = new String(message.getLogMessage()); + String key = message.getLogKey() == null ? "" : new String(message.getLogKey()); + String value = message.getLogMessage() == null ? "" : new String(message.getLogMessage()); String error = ""; ErrorInfo errorInfo = message.getErrorInfo(); diff --git a/src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java b/src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java new file mode 100644 index 000000000..1287f865d --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java @@ -0,0 +1,108 @@ +package io.odpf.firehose.sink.dlq; + +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.dlq.log.LogDlqWriter; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class LogDlqWriterTest { + + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + + private LogDlqWriter logDlqWriter; + + @Before + public void setUp() throws Exception { + logDlqWriter = new LogDlqWriter(firehoseInstrumentation); + } + + @Test + public void shouldWriteMessagesToLog() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String key = new String(message.getLogKey()); + String value = new String(message.getLogMessage()); + ErrorInfo errorInfo = message.getErrorInfo(); + String error = ExceptionUtils.getStackTrace(errorInfo.getException()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, error); + } + + @Test + public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String value = new String(message.getLogMessage()); + ErrorInfo errorInfo = message.getErrorInfo(); + String error = ExceptionUtils.getStackTrace(errorInfo.getException()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", "", value, error); + } + + @Test + public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String key = new String(message.getLogKey()); + ErrorInfo errorInfo = message.getErrorInfo(); + String error = ExceptionUtils.getStackTrace(errorInfo.getException()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, "", error); + } + + @Test + public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null); + + String key = new String(message.getLogKey()); + String value = new String(message.getLogMessage()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, ""); + } + + @Test + public void shouldWriteMessagesToLogWhenErrorInfoExceptionIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + + String key = new String(message.getLogKey()); + String value = new String(message.getLogMessage()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, ""); + } +}