Skip to content

Commit

Permalink
fix: fix typo and handle json messages on log dlq writer (#180)
Browse files Browse the repository at this point in the history
* fix: fix typo and handle json messages on log dlq writer

* test: add logDlqwriter unit test
  • Loading branch information
jesrypandawa authored and lavkesh committed Jul 7, 2022
1 parent 7e22028 commit 819b34d
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ private Sink createSink(Tracer tracer, SinkFactory sinkFactory) {
Sink baseSink = sinkFactory.getSink();
Sink sinkWithFailHandler = new SinkWithFailHandler(baseSink, errorHandler);
Sink sinkWithRetry = withRetry(sinkWithFailHandler, errorHandler);
Sink sinWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler);
return new SinkFinal(sinWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class));
Sink sinkWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler);
return new SinkFinal(sinkWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class));
}

public Sink withDlq(Sink sink, Tracer tracer, ErrorHandler errorHandler) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public LogDlqWriter(FirehoseInstrumentation firehoseInstrumentation) {
@Override
public List<Message> write(List<Message> messages) throws IOException {
for (Message message : messages) {
String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());
String key = message.getLogKey() == null ? "" : new String(message.getLogKey());
String value = message.getLogMessage() == null ? "" : new String(message.getLogMessage());

String error = "";
ErrorInfo errorInfo = message.getErrorInfo();
Expand Down
108 changes: 108 additions & 0 deletions src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.odpf.firehose.sink.dlq;

import io.odpf.depot.error.ErrorInfo;
import io.odpf.depot.error.ErrorType;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.dlq.log.LogDlqWriter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;

@RunWith(MockitoJUnitRunner.class)
public class LogDlqWriterTest {

@Mock
private FirehoseInstrumentation firehoseInstrumentation;

private LogDlqWriter logDlqWriter;

@Before
public void setUp() throws Exception {
logDlqWriter = new LogDlqWriter(firehoseInstrumentation);
}

@Test
public void shouldWriteMessagesToLog() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());
ErrorInfo errorInfo = message.getErrorInfo();
String error = ExceptionUtils.getStackTrace(errorInfo.getException());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, error);
}

@Test
public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String value = new String(message.getLogMessage());
ErrorInfo errorInfo = message.getErrorInfo();
String error = ExceptionUtils.getStackTrace(errorInfo.getException());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", "", value, error);
}

@Test
public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String key = new String(message.getLogKey());
ErrorInfo errorInfo = message.getErrorInfo();
String error = ExceptionUtils.getStackTrace(errorInfo.getException());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, "", error);
}

@Test
public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null);

String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, "");
}

@Test
public void shouldWriteMessagesToLogWhenErrorInfoExceptionIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR));

String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, "");
}
}

0 comments on commit 819b34d

Please sign in to comment.