diff --git a/build.gradle b/build.gradle index fc6eca82d..a1f88e226 100644 --- a/build.gradle +++ b/build.gradle @@ -101,7 +101,7 @@ dependencies { implementation 'com.google.cloud:google-cloud-storage:1.114.0' implementation 'com.google.cloud:google-cloud-bigquery:1.115.0' implementation 'org.apache.logging.log4j:log4j-core:2.17.1' - implementation group: 'io.odpf', name: 'depot', version: '0.1.5' + implementation group: 'io.odpf', name: 'depot', version: '0.1.6' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' testImplementation group: 'junit', name: 'junit', version: '4.11' diff --git a/docs/docs/advance/generic.md b/docs/docs/advance/generic.md index 93764c6dc..308e33913 100644 Binary files a/docs/docs/advance/generic.md and b/docs/docs/advance/generic.md differ diff --git a/src/main/java/io/odpf/firehose/config/AppConfig.java b/src/main/java/io/odpf/firehose/config/AppConfig.java index 5dd7ef43b..612dbb2a1 100644 --- a/src/main/java/io/odpf/firehose/config/AppConfig.java +++ b/src/main/java/io/odpf/firehose/config/AppConfig.java @@ -1,9 +1,11 @@ package io.odpf.firehose.config; +import io.odpf.firehose.config.converter.InputSchemaTypeConverter; import io.odpf.firehose.config.converter.ProtoIndexToFieldMapConverter; import io.odpf.firehose.config.converter.SchemaRegistryHeadersConverter; import io.odpf.firehose.config.converter.SchemaRegistryRefreshConverter; import io.odpf.firehose.config.converter.SinkTypeConverter; +import io.odpf.firehose.config.enums.InputSchemaType; import io.odpf.firehose.config.enums.SinkType; import io.odpf.stencil.cache.SchemaRefreshStrategy; @@ -15,18 +17,6 @@ public interface AppConfig extends Config { - @Key("METRIC_STATSD_HOST") - @DefaultValue("localhost") - String getMetricStatsDHost(); - - @Key("METRIC_STATSD_PORT") - @DefaultValue("8125") - Integer getMetricStatsDPort(); - - @Key("METRIC_STATSD_TAGS") - @DefaultValue("") - String getMetricStatsDTags(); - @Key("SINK_TYPE") @ConverterClass(SinkTypeConverter.class) SinkType getSinkType(); @@ -80,6 +70,11 @@ public interface AppConfig extends Config { @Key("INPUT_SCHEMA_PROTO_CLASS") String getInputSchemaProtoClass(); + @Key("INPUT_SCHEMA_DATA_TYPE") + @DefaultValue("PROTOBUF") + @ConverterClass(InputSchemaTypeConverter.class) + InputSchemaType getInputSchemaType(); + @Key("INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING") @ConverterClass(ProtoIndexToFieldMapConverter.class) Properties getInputSchemaProtoToColumnMapping(); diff --git a/src/main/java/io/odpf/firehose/config/BigQuerySinkConfig.java b/src/main/java/io/odpf/firehose/config/BigQuerySinkConfig.java deleted file mode 100644 index ef68a531e..000000000 --- a/src/main/java/io/odpf/firehose/config/BigQuerySinkConfig.java +++ /dev/null @@ -1,61 +0,0 @@ -package io.odpf.firehose.config; - -import io.odpf.firehose.config.converter.LabelMapConverter; - -import java.util.Map; - -public interface BigQuerySinkConfig extends AppConfig { - - @Key("SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID") - String getGCloudProjectID(); - - @Key("SINK_BIGQUERY_TABLE_NAME") - String getTableName(); - - @Key("SINK_BIGQUERY_DATASET_LABELS") - @Separator(LabelMapConverter.ELEMENT_SEPARATOR) - @ConverterClass(LabelMapConverter.class) - Map getDatasetLabels(); - - @Key("SINK_BIGQUERY_TABLE_LABELS") - @Separator(LabelMapConverter.ELEMENT_SEPARATOR) - @ConverterClass(LabelMapConverter.class) - Map getTableLabels(); - - @Key("SINK_BIGQUERY_DATASET_NAME") - String getDatasetName(); - - @Key("SINK_BIGQUERY_CREDENTIAL_PATH") - String getBigQueryCredentialPath(); - - @Key("SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE") - @DefaultValue("false") - Boolean isTablePartitioningEnabled(); - - @Key("SINK_BIGQUERY_TABLE_PARTITION_KEY") - String getTablePartitionKey(); - - @Key("SINK_BIGQUERY_ROW_INSERT_ID_ENABLE") - @DefaultValue("true") - Boolean isRowInsertIdEnabled(); - - @Key("SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS") - @DefaultValue("-1") - int getBqClientReadTimeoutMS(); - - @Key("SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS") - @DefaultValue("-1") - int getBqClientConnectTimeoutMS(); - - @Key("SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS") - @DefaultValue("-1") - Long getBigQueryTablePartitionExpiryMS(); - - @Key("SINK_BIGQUERY_DATASET_LOCATION") - @DefaultValue("asia-southeast1") - String getBigQueryDatasetLocation(); - - @DefaultValue("") - @Key("SINK_BIGQUERY_METADATA_NAMESPACE") - String getBqMetadataNamespace(); -} diff --git a/src/main/java/io/odpf/firehose/config/converter/InputSchemaTypeConverter.java b/src/main/java/io/odpf/firehose/config/converter/InputSchemaTypeConverter.java new file mode 100644 index 000000000..8341a6f3f --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/converter/InputSchemaTypeConverter.java @@ -0,0 +1,13 @@ +package io.odpf.firehose.config.converter; + +import io.odpf.firehose.config.enums.InputSchemaType; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; + +public class InputSchemaTypeConverter implements Converter { + @Override + public InputSchemaType convert(Method method, String input) { + return InputSchemaType.valueOf(input.trim().toUpperCase()); + } +} diff --git a/src/main/java/io/odpf/firehose/config/enums/InputSchemaType.java b/src/main/java/io/odpf/firehose/config/enums/InputSchemaType.java new file mode 100644 index 000000000..49fadc084 --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/enums/InputSchemaType.java @@ -0,0 +1,6 @@ +package io.odpf.firehose.config.enums; + +public enum InputSchemaType { + PROTOBUF, + JSON +} diff --git a/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java b/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java index 1eade3ec8..d17e04616 100644 --- a/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java +++ b/src/main/java/io/odpf/firehose/consumer/FirehoseConsumerFactory.java @@ -161,8 +161,8 @@ private Sink createSink(Tracer tracer, SinkFactory sinkFactory) { Sink baseSink = sinkFactory.getSink(); Sink sinkWithFailHandler = new SinkWithFailHandler(baseSink, errorHandler); Sink sinkWithRetry = withRetry(sinkWithFailHandler, errorHandler); - Sink sinWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler); - return new SinkFinal(sinWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class)); + Sink sinkWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler); + return new SinkFinal(sinkWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class)); } public Sink withDlq(Sink sink, Tracer tracer, ErrorHandler errorHandler) { diff --git a/src/main/java/io/odpf/firehose/sink/SinkFactory.java b/src/main/java/io/odpf/firehose/sink/SinkFactory.java index fa68edda8..00ee24e26 100644 --- a/src/main/java/io/odpf/firehose/sink/SinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/SinkFactory.java @@ -2,6 +2,7 @@ import io.odpf.depot.bigquery.BigQuerySink; import io.odpf.depot.bigquery.BigQuerySinkFactory; +import io.odpf.depot.config.BigQuerySinkConfig; import io.odpf.depot.log.LogSink; import io.odpf.depot.log.LogSinkFactory; import io.odpf.depot.metrics.StatsDReporter; @@ -21,6 +22,7 @@ import io.odpf.firehose.sink.prometheus.PromSinkFactory; import io.odpf.firehose.sink.redis.RedisSinkFactory; import io.odpf.stencil.client.StencilClient; +import org.aeonbits.owner.ConfigFactory; import java.util.Map; @@ -67,7 +69,10 @@ public void init() { return; case BIGQUERY: BigquerySinkUtils.addMetadataColumns(config); - bigQuerySinkFactory = new BigQuerySinkFactory(config, statsDReporter, BigquerySinkUtils.getRowIDCreator()); + bigQuerySinkFactory = new BigQuerySinkFactory( + ConfigFactory.create(BigQuerySinkConfig.class, config), + statsDReporter, + BigquerySinkUtils.getRowIDCreator()); bigQuerySinkFactory.init(); return; default: diff --git a/src/main/java/io/odpf/firehose/sink/SinkFactoryUtils.java b/src/main/java/io/odpf/firehose/sink/SinkFactoryUtils.java index f5a80ce7c..2c2b80925 100644 --- a/src/main/java/io/odpf/firehose/sink/SinkFactoryUtils.java +++ b/src/main/java/io/odpf/firehose/sink/SinkFactoryUtils.java @@ -8,8 +8,9 @@ public class SinkFactoryUtils { protected static Map addAdditionalConfigsForSinkConnectors(Map env) { Map finalConfig = new HashMap<>(env); - finalConfig.put("SINK_CONNECTOR_SCHEMA_MESSAGE_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", "")); - finalConfig.put("SINK_CONNECTOR_SCHEMA_KEY_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", "")); + finalConfig.put("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", "")); + finalConfig.put("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", "")); + finalConfig.put("SINK_CONNECTOR_SCHEMA_DATA_TYPE", env.getOrDefault("INPUT_SCHEMA_DATA_TYPE", "protobuf")); finalConfig.put("SINK_METRICS_APPLICATION_PREFIX", "firehose_"); finalConfig.put("SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE", env.getOrDefault("INPUT_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE", "false")); finalConfig.put("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE", diff --git a/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java b/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java index 595c71725..3923b5ad6 100644 --- a/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java +++ b/src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java @@ -20,8 +20,8 @@ public LogDlqWriter(FirehoseInstrumentation firehoseInstrumentation) { @Override public List write(List messages) throws IOException { for (Message message : messages) { - String key = new String(message.getLogKey()); - String value = new String(message.getLogMessage()); + String key = message.getLogKey() == null ? "" : new String(message.getLogKey()); + String value = message.getLogMessage() == null ? "" : new String(message.getLogMessage()); String error = ""; ErrorInfo errorInfo = message.getErrorInfo(); diff --git a/src/main/java/io/odpf/firehose/sinkdecorator/SinkWithRetry.java b/src/main/java/io/odpf/firehose/sinkdecorator/SinkWithRetry.java index 60332800c..77a37aa70 100644 --- a/src/main/java/io/odpf/firehose/sinkdecorator/SinkWithRetry.java +++ b/src/main/java/io/odpf/firehose/sinkdecorator/SinkWithRetry.java @@ -16,6 +16,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static io.odpf.firehose.metrics.Metrics.RETRY_MESSAGES_TOTAL; import static io.odpf.firehose.metrics.Metrics.RETRY_ATTEMPTS_TOTAL; @@ -65,11 +66,27 @@ public List pushMessage(List inputMessages) throws IOException private void logDebug(List messageList) throws IOException { if (firehoseInstrumentation.isDebugEnabled()) { - List serializedBody = new ArrayList<>(); - for (Message message : messageList) { - serializedBody.add(parser.parse(message)); + switch (appConfig.getInputSchemaType()) { + case PROTOBUF: + List serializedBody = new ArrayList<>(); + for (Message message : messageList) { + serializedBody.add(parser.parse(message)); + } + firehoseInstrumentation.logDebug("Retry failed messages: \n{}", serializedBody.toString()); + break; + case JSON: + List messages = messageList.stream().map(m -> { + if (appConfig.getKafkaRecordParserMode().equals("key")) { + return new String(m.getLogKey()); + } else { + return new String(m.getLogMessage()); + } + }).collect(Collectors.toList()); + firehoseInstrumentation.logDebug("Retry failed messages: \n{}", messages.toString()); + break; + default: + throw new IllegalArgumentException("Unexpected value: " + appConfig.getInputSchemaType()); } - firehoseInstrumentation.logDebug("Retry failed messages: \n{}", serializedBody.toString()); } } diff --git a/src/test/java/io/odpf/firehose/config/converter/InputSchemaTypeConverterTest.java b/src/test/java/io/odpf/firehose/config/converter/InputSchemaTypeConverterTest.java new file mode 100644 index 000000000..2eb31b5e1 --- /dev/null +++ b/src/test/java/io/odpf/firehose/config/converter/InputSchemaTypeConverterTest.java @@ -0,0 +1,24 @@ +package io.odpf.firehose.config.converter; + +import io.odpf.firehose.config.enums.InputSchemaType; +import org.junit.Assert; +import org.junit.Test; + +public class InputSchemaTypeConverterTest { + + @Test + public void shouldConvertSchemaType() { + InputSchemaTypeConverter converter = new InputSchemaTypeConverter(); + InputSchemaType schemaType = converter.convert(null, "PROTOBUF"); + Assert.assertEquals(InputSchemaType.PROTOBUF, schemaType); + schemaType = converter.convert(null, "JSON"); + Assert.assertEquals(InputSchemaType.JSON, schemaType); + } + + @Test + public void shouldConvertSchemaTypeWithLowerCase() { + InputSchemaTypeConverter converter = new InputSchemaTypeConverter(); + InputSchemaType schemaType = converter.convert(null, " json "); + Assert.assertEquals(InputSchemaType.JSON, schemaType); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/SinkFactoryUtilsTest.java b/src/test/java/io/odpf/firehose/sink/SinkFactoryUtilsTest.java index 3cf07d78f..d573bd992 100644 --- a/src/test/java/io/odpf/firehose/sink/SinkFactoryUtilsTest.java +++ b/src/test/java/io/odpf/firehose/sink/SinkFactoryUtilsTest.java @@ -15,8 +15,8 @@ public void shouldAddSinkConnectorConfigs() { put("INPUT_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE", "true"); }}; Map configs = SinkFactoryUtils.addAdditionalConfigsForSinkConnectors(env); - Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_MESSAGE_CLASS")); - Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_KEY_CLASS")); + Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS")); + Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS")); Assert.assertEquals("firehose_", configs.get("SINK_METRICS_APPLICATION_PREFIX")); Assert.assertEquals("true", configs.get("SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE")); Assert.assertEquals("LOG_MESSAGE", configs.get("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE")); diff --git a/src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java b/src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java new file mode 100644 index 000000000..1287f865d --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java @@ -0,0 +1,108 @@ +package io.odpf.firehose.sink.dlq; + +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.dlq.log.LogDlqWriter; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class LogDlqWriterTest { + + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + + private LogDlqWriter logDlqWriter; + + @Before + public void setUp() throws Exception { + logDlqWriter = new LogDlqWriter(firehoseInstrumentation); + } + + @Test + public void shouldWriteMessagesToLog() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String key = new String(message.getLogKey()); + String value = new String(message.getLogMessage()); + ErrorInfo errorInfo = message.getErrorInfo(); + String error = ExceptionUtils.getStackTrace(errorInfo.getException()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, error); + } + + @Test + public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String value = new String(message.getLogMessage()); + ErrorInfo errorInfo = message.getErrorInfo(); + String error = ExceptionUtils.getStackTrace(errorInfo.getException()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", "", value, error); + } + + @Test + public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String key = new String(message.getLogKey()); + ErrorInfo errorInfo = message.getErrorInfo(); + String error = ExceptionUtils.getStackTrace(errorInfo.getException()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, "", error); + } + + @Test + public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null); + + String key = new String(message.getLogKey()); + String value = new String(message.getLogMessage()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, ""); + } + + @Test + public void shouldWriteMessagesToLogWhenErrorInfoExceptionIsNull() throws IOException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + + String key = new String(message.getLogKey()); + String value = new String(message.getLogMessage()); + + List messages = Collections.singletonList(message); + Assert.assertEquals(0, logDlqWriter.write(messages).size()); + + Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, ""); + } +} diff --git a/src/test/java/io/odpf/firehose/sinkdecorator/SinkWithRetryTest.java b/src/test/java/io/odpf/firehose/sinkdecorator/SinkWithRetryTest.java index 2da50b544..f36f79999 100644 --- a/src/test/java/io/odpf/firehose/sinkdecorator/SinkWithRetryTest.java +++ b/src/test/java/io/odpf/firehose/sinkdecorator/SinkWithRetryTest.java @@ -4,6 +4,7 @@ import io.odpf.depot.error.ErrorType; import io.odpf.firehose.config.AppConfig; import io.odpf.firehose.config.ErrorConfig; +import io.odpf.firehose.config.enums.InputSchemaType; import io.odpf.firehose.message.Message; import io.odpf.firehose.error.ErrorHandler; import io.odpf.firehose.exception.DeserializerException; @@ -128,6 +129,7 @@ public void shouldRetryUntilSuccess() throws IOException, DeserializerException @Test public void shouldLogRetriesMessages() throws IOException, DeserializerException { when(appConfig.getRetryMaxAttempts()).thenReturn(10); + when(appConfig.getInputSchemaType()).thenReturn(InputSchemaType.PROTOBUF); ArrayList messages = new ArrayList<>(); messages.add(message); messages.add(message); @@ -149,6 +151,33 @@ public void shouldLogRetriesMessages() throws IOException, DeserializerException verify(firehoseInstrumentation, times(5)).logDebug("Retry failed messages: \n{}", "[null, null]"); } + @Test + public void shouldLogRetriesMessagesForJsonInput() throws IOException, DeserializerException { + when(appConfig.getRetryMaxAttempts()).thenReturn(10); + when(appConfig.getInputSchemaType()).thenReturn(InputSchemaType.JSON); + when(appConfig.getKafkaRecordParserMode()).thenReturn("message"); + when(message.getLogMessage()).thenReturn("testing message".getBytes()); + ArrayList messages = new ArrayList<>(); + messages.add(message); + messages.add(message); + when(message.getErrorInfo()).thenReturn(new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + when(firehoseInstrumentation.isDebugEnabled()).thenReturn(true); + when(sinkDecorator.pushMessage(anyList())).thenReturn(messages).thenReturn(messages).thenReturn(messages) + .thenReturn(messages).thenReturn(messages).thenReturn(new ArrayList<>()); + SinkWithRetry sinkWithRetry = new SinkWithRetry(sinkDecorator, backOffProvider, firehoseInstrumentation, appConfig, parser, errorHandler); + + List messageList = sinkWithRetry.pushMessage(Collections.singletonList(message)); + assertTrue(messageList.isEmpty()); + verify(firehoseInstrumentation, times(1)).logInfo("Maximum retry attempts: {}", 10); + verify(firehoseInstrumentation, times(5)).incrementCounter("firehose_retry_attempts_total"); + verify(firehoseInstrumentation, times(1)).logInfo("Retrying messages attempt count: {}, Number of messages: {}", 1, 2); + verify(firehoseInstrumentation, times(1)).logInfo("Retrying messages attempt count: {}, Number of messages: {}", 2, 2); + verify(firehoseInstrumentation, times(1)).logInfo("Retrying messages attempt count: {}, Number of messages: {}", 3, 2); + verify(firehoseInstrumentation, times(1)).logInfo("Retrying messages attempt count: {}, Number of messages: {}", 4, 2); + verify(firehoseInstrumentation, times(1)).logInfo("Retrying messages attempt count: {}, Number of messages: {}", 5, 2); + verify(firehoseInstrumentation, times(5)).logDebug("Retry failed messages: \n{}", "[testing message, testing message]"); + } + @Test public void shouldAddInstrumentationForRetry() throws Exception { when(appConfig.getRetryMaxAttempts()).thenReturn(3);