Skip to content

Commit

Permalink
feat: json support for bq sink (#173)
Browse files Browse the repository at this point in the history
* feat: json support for bq sink

* feat: add/update depot configs in sinkFactoryUtils

* chore: version bump and removing json related configs

* feat: add json schema type

* fix: fix typo and handle json messages on log dlq writer (#180)

* fix: fix typo and handle json messages on log dlq writer

* test: add logDlqwriter unit test

* chore: version bump

Co-authored-by: kevin.bheda <[email protected]>
Co-authored-by: mayur.gubrele <[email protected]>
Co-authored-by: lavkesh <[email protected]>
Co-authored-by: jesrypandawa <[email protected]>
  • Loading branch information
5 people authored Jul 27, 2022
1 parent d2a9a25 commit abdb7da
Show file tree
Hide file tree
Showing 15 changed files with 224 additions and 87 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation group: 'io.odpf', name: 'depot', version: '0.1.5'
implementation group: 'io.odpf', name: 'depot', version: '0.1.6'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
Binary file modified docs/docs/advance/generic.md
Binary file not shown.
19 changes: 7 additions & 12 deletions src/main/java/io/odpf/firehose/config/AppConfig.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.odpf.firehose.config;

import io.odpf.firehose.config.converter.InputSchemaTypeConverter;
import io.odpf.firehose.config.converter.ProtoIndexToFieldMapConverter;
import io.odpf.firehose.config.converter.SchemaRegistryHeadersConverter;
import io.odpf.firehose.config.converter.SchemaRegistryRefreshConverter;
import io.odpf.firehose.config.converter.SinkTypeConverter;
import io.odpf.firehose.config.enums.InputSchemaType;
import io.odpf.firehose.config.enums.SinkType;
import io.odpf.stencil.cache.SchemaRefreshStrategy;

Expand All @@ -15,18 +17,6 @@

public interface AppConfig extends Config {

@Key("METRIC_STATSD_HOST")
@DefaultValue("localhost")
String getMetricStatsDHost();

@Key("METRIC_STATSD_PORT")
@DefaultValue("8125")
Integer getMetricStatsDPort();

@Key("METRIC_STATSD_TAGS")
@DefaultValue("")
String getMetricStatsDTags();

@Key("SINK_TYPE")
@ConverterClass(SinkTypeConverter.class)
SinkType getSinkType();
Expand Down Expand Up @@ -80,6 +70,11 @@ public interface AppConfig extends Config {
@Key("INPUT_SCHEMA_PROTO_CLASS")
String getInputSchemaProtoClass();

@Key("INPUT_SCHEMA_DATA_TYPE")
@DefaultValue("PROTOBUF")
@ConverterClass(InputSchemaTypeConverter.class)
InputSchemaType getInputSchemaType();

@Key("INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING")
@ConverterClass(ProtoIndexToFieldMapConverter.class)
Properties getInputSchemaProtoToColumnMapping();
Expand Down
61 changes: 0 additions & 61 deletions src/main/java/io/odpf/firehose/config/BigQuerySinkConfig.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.InputSchemaType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class InputSchemaTypeConverter implements Converter<InputSchemaType> {
@Override
public InputSchemaType convert(Method method, String input) {
return InputSchemaType.valueOf(input.trim().toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.odpf.firehose.config.enums;

public enum InputSchemaType {
PROTOBUF,
JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ private Sink createSink(Tracer tracer, SinkFactory sinkFactory) {
Sink baseSink = sinkFactory.getSink();
Sink sinkWithFailHandler = new SinkWithFailHandler(baseSink, errorHandler);
Sink sinkWithRetry = withRetry(sinkWithFailHandler, errorHandler);
Sink sinWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler);
return new SinkFinal(sinWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class));
Sink sinkWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler);
return new SinkFinal(sinkWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class));
}

public Sink withDlq(Sink sink, Tracer tracer, ErrorHandler errorHandler) {
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/odpf/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.odpf.depot.bigquery.BigQuerySink;
import io.odpf.depot.bigquery.BigQuerySinkFactory;
import io.odpf.depot.config.BigQuerySinkConfig;
import io.odpf.depot.log.LogSink;
import io.odpf.depot.log.LogSinkFactory;
import io.odpf.depot.metrics.StatsDReporter;
Expand All @@ -21,6 +22,7 @@
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
import io.odpf.firehose.sink.redis.RedisSinkFactory;
import io.odpf.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

import java.util.Map;

Expand Down Expand Up @@ -67,7 +69,10 @@ public void init() {
return;
case BIGQUERY:
BigquerySinkUtils.addMetadataColumns(config);
bigQuerySinkFactory = new BigQuerySinkFactory(config, statsDReporter, BigquerySinkUtils.getRowIDCreator());
bigQuerySinkFactory = new BigQuerySinkFactory(
ConfigFactory.create(BigQuerySinkConfig.class, config),
statsDReporter,
BigquerySinkUtils.getRowIDCreator());
bigQuerySinkFactory.init();
return;
default:
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/odpf/firehose/sink/SinkFactoryUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
public class SinkFactoryUtils {
protected static Map<String, String> addAdditionalConfigsForSinkConnectors(Map<String, String> env) {
Map<String, String> finalConfig = new HashMap<>(env);
finalConfig.put("SINK_CONNECTOR_SCHEMA_MESSAGE_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", ""));
finalConfig.put("SINK_CONNECTOR_SCHEMA_KEY_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", ""));
finalConfig.put("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", ""));
finalConfig.put("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS", env.getOrDefault("INPUT_SCHEMA_PROTO_CLASS", ""));
finalConfig.put("SINK_CONNECTOR_SCHEMA_DATA_TYPE", env.getOrDefault("INPUT_SCHEMA_DATA_TYPE", "protobuf"));
finalConfig.put("SINK_METRICS_APPLICATION_PREFIX", "firehose_");
finalConfig.put("SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE", env.getOrDefault("INPUT_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE", "false"));
finalConfig.put("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE",
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/odpf/firehose/sink/dlq/log/LogDlqWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public LogDlqWriter(FirehoseInstrumentation firehoseInstrumentation) {
@Override
public List<Message> write(List<Message> messages) throws IOException {
for (Message message : messages) {
String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());
String key = message.getLogKey() == null ? "" : new String(message.getLogKey());
String value = message.getLogMessage() == null ? "" : new String(message.getLogMessage());

String error = "";
ErrorInfo errorInfo = message.getErrorInfo();
Expand Down
25 changes: 21 additions & 4 deletions src/main/java/io/odpf/firehose/sinkdecorator/SinkWithRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static io.odpf.firehose.metrics.Metrics.RETRY_MESSAGES_TOTAL;
import static io.odpf.firehose.metrics.Metrics.RETRY_ATTEMPTS_TOTAL;
Expand Down Expand Up @@ -65,11 +66,27 @@ public List<Message> pushMessage(List<Message> inputMessages) throws IOException

private void logDebug(List<Message> messageList) throws IOException {
if (firehoseInstrumentation.isDebugEnabled()) {
List<DynamicMessage> serializedBody = new ArrayList<>();
for (Message message : messageList) {
serializedBody.add(parser.parse(message));
switch (appConfig.getInputSchemaType()) {
case PROTOBUF:
List<DynamicMessage> serializedBody = new ArrayList<>();
for (Message message : messageList) {
serializedBody.add(parser.parse(message));
}
firehoseInstrumentation.logDebug("Retry failed messages: \n{}", serializedBody.toString());
break;
case JSON:
List<String> messages = messageList.stream().map(m -> {
if (appConfig.getKafkaRecordParserMode().equals("key")) {
return new String(m.getLogKey());
} else {
return new String(m.getLogMessage());
}
}).collect(Collectors.toList());
firehoseInstrumentation.logDebug("Retry failed messages: \n{}", messages.toString());
break;
default:
throw new IllegalArgumentException("Unexpected value: " + appConfig.getInputSchemaType());
}
firehoseInstrumentation.logDebug("Retry failed messages: \n{}", serializedBody.toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.InputSchemaType;
import org.junit.Assert;
import org.junit.Test;

public class InputSchemaTypeConverterTest {

@Test
public void shouldConvertSchemaType() {
InputSchemaTypeConverter converter = new InputSchemaTypeConverter();
InputSchemaType schemaType = converter.convert(null, "PROTOBUF");
Assert.assertEquals(InputSchemaType.PROTOBUF, schemaType);
schemaType = converter.convert(null, "JSON");
Assert.assertEquals(InputSchemaType.JSON, schemaType);
}

@Test
public void shouldConvertSchemaTypeWithLowerCase() {
InputSchemaTypeConverter converter = new InputSchemaTypeConverter();
InputSchemaType schemaType = converter.convert(null, " json ");
Assert.assertEquals(InputSchemaType.JSON, schemaType);
}
}
4 changes: 2 additions & 2 deletions src/test/java/io/odpf/firehose/sink/SinkFactoryUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public void shouldAddSinkConnectorConfigs() {
put("INPUT_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE", "true");
}};
Map<String, String> configs = SinkFactoryUtils.addAdditionalConfigsForSinkConnectors(env);
Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_MESSAGE_CLASS"));
Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_KEY_CLASS"));
Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS"));
Assert.assertEquals("com.test.SomeProtoClass", configs.get("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS"));
Assert.assertEquals("firehose_", configs.get("SINK_METRICS_APPLICATION_PREFIX"));
Assert.assertEquals("true", configs.get("SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE"));
Assert.assertEquals("LOG_MESSAGE", configs.get("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE"));
Expand Down
108 changes: 108 additions & 0 deletions src/test/java/io/odpf/firehose/sink/dlq/LogDlqWriterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.odpf.firehose.sink.dlq;

import io.odpf.depot.error.ErrorInfo;
import io.odpf.depot.error.ErrorType;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.dlq.log.LogDlqWriter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;

@RunWith(MockitoJUnitRunner.class)
public class LogDlqWriterTest {

@Mock
private FirehoseInstrumentation firehoseInstrumentation;

private LogDlqWriter logDlqWriter;

@Before
public void setUp() throws Exception {
logDlqWriter = new LogDlqWriter(firehoseInstrumentation);
}

@Test
public void shouldWriteMessagesToLog() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());
ErrorInfo errorInfo = message.getErrorInfo();
String error = ExceptionUtils.getStackTrace(errorInfo.getException());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, error);
}

@Test
public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String value = new String(message.getLogMessage());
ErrorInfo errorInfo = message.getErrorInfo();
String error = ExceptionUtils.getStackTrace(errorInfo.getException());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", "", value, error);
}

@Test
public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String key = new String(message.getLogKey());
ErrorInfo errorInfo = message.getErrorInfo();
String error = ExceptionUtils.getStackTrace(errorInfo.getException());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, "", error);
}

@Test
public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null);

String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, "");
}

@Test
public void shouldWriteMessagesToLogWhenErrorInfoExceptionIsNull() throws IOException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR));

String key = new String(message.getLogKey());
String value = new String(message.getLogMessage());

List<Message> messages = Collections.singletonList(message);
Assert.assertEquals(0, logDlqWriter.write(messages).size());

Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("key: {}\nvalue: {}\nerror: {}", key, value, "");
}
}
Loading

0 comments on commit abdb7da

Please sign in to comment.