Skip to content

Commit

Permalink
Bq sink connector using depot repo. (#168)
Browse files Browse the repository at this point in the history
* feat: adding odpf/depot depedencies

* feat: add github maven repo for odpf/depot

* feat: add github auth

* chore: depenedencies

* chore: version bump

* chore: version bump

* refactor: move common code to seperate class
  • Loading branch information
lavkesh committed Jun 27, 2022
1 parent 9a4b24c commit f914382
Show file tree
Hide file tree
Showing 253 changed files with 1,842 additions and 7,074 deletions.
27 changes: 13 additions & 14 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ repositories {
}
}


private Properties loadEnv() {
Properties properties = new Properties()
properties.load(new FileInputStream(file("${projectDir}/env/local.properties")));
Expand Down Expand Up @@ -86,7 +85,7 @@ dependencies {
exclude group: "log4j", module: "log4j"
}
implementation 'io.confluent:monitoring-interceptors:3.0.0'
implementation "io.grpc:grpc-all:1.18.0"
implementation "io.grpc:grpc-all:1.38.0"
implementation group: 'org.jfrog.buildinfo', name: 'build-info-extractor', version: '2.6.3'
implementation group: 'com.google.gradle', name: 'osdetector-gradle-plugin', version: '1.2.1'
implementation group: 'org.apache.ivy', name: 'ivy', version: '2.2.0'
Expand All @@ -102,12 +101,12 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'

implementation group: 'io.odpf', name: 'depot', version: '0.1.3'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:2.0.99-beta'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation "com.github.tomakehurst:wiremock:2.3.1"
testImplementation group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
testImplementation group: 'org.mock-server', name: 'mockserver-netty', version: '3.10.5'
Expand All @@ -133,7 +132,7 @@ protobuf {
task.generateDescriptorSet = true
task.descriptorSetOptions.includeSourceInfo = false
task.descriptorSetOptions.includeImports = true
task.descriptorSetOptions.path = "$projectDir/src/test/resources/__files/descriptors.bin"
task.descriptorSetOptions.path = "$projectDir/src/test/resources/__files/descriptors.bin"
}
}
}
Expand All @@ -157,17 +156,17 @@ test {
clean {
delete "$projectDir/src/test/resources/__files"
}

jar {
manifest {
attributes 'Main-Class': 'io.odpf.firehose.launch.Main'
duplicatesStrategy = 'exclude'
}
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
manifest {
attributes 'Main-Class': 'io.odpf.firehose.launch.Main'
duplicatesStrategy = 'exclude'
zip64 = true
}
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
exclude('META-INF/*.RSA', 'META-INF/*.SF', 'META-INF/*.DSA')
}

publishing {
publications {
maven(MavenPublication) {
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/concepts/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ You can read more about JSON Schema [here](https://json-schema.org/). For more d

The filtering occurs in the following steps -

- JSON filter configurations are validated and logged to instrumentation by JsonFilterUtil. In case any configuration is invalid, then IllegalArgumentException is thrown and Firehose is terminated.
- JSON filter configurations are validated and logged to firehoseInstrumentation by JsonFilterUtil. In case any configuration is invalid, then IllegalArgumentException is thrown and Firehose is terminated.
- If `FILTER_ESB_MESSAGE_FORMAT=PROTOBUF`, then the serialized key/message protobuf byte array is deserialized to POJO object by the Proto schema class. It is then converted to a JSON string so that it can be parsed by the JSON Schema Validator.
- If`FILTER_ESB_MESSAGE_FORMAT=JSON`, then the serialized JSON byte array is deserialized to a JSON message string.
- The JSON Schema validator performs a validation on the JSON message against the filter rules specified in the JSON Schema string provided in the environment variable`FILTER_JSON_SCHEMA.`
- If there are any validation errors, then that key/message is filtered out and the validation errors are logged to the instrumentation in debug mode.
- If there are any validation errors, then that key/message is filtered out and the validation errors are logged to the firehoseInstrumentation in debug mode.
- If all validation checks pass, then the key/message is added to the ArrayList of filtered messages and returned by the JsonFilter.

## Why Use Filters
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/odpf/firehose/config/ErrorConfig.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.odpf.firehose.config;

import io.odpf.depot.error.ErrorType;
import io.odpf.firehose.config.converter.SetErrorTypeConverter;
import io.odpf.firehose.error.ErrorType;
import org.aeonbits.owner.Config;
import org.aeonbits.owner.Mutable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.error.ErrorType;
import io.odpf.depot.error.ErrorType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import io.odpf.firehose.consumer.kafka.ConsumerAndOffsetManager;
import io.odpf.firehose.exception.FirehoseConsumerFailedException;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.SinkPool;
import io.odpf.firehose.filter.FilterException;
import io.odpf.firehose.filter.FilteredMessages;
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.tracer.SinkTracer;
import io.opentracing.Span;
import lombok.AllArgsConstructor;
Expand All @@ -24,7 +24,7 @@ public class FirehoseAsyncConsumer implements FirehoseConsumer {
private final SinkTracer tracer;
private final ConsumerAndOffsetManager consumerAndOffsetManager;
private final FirehoseFilter firehoseFilter;
private final Instrumentation instrumentation;
private final FirehoseInstrumentation firehoseInstrumentation;

@Override
public void process() {
Expand All @@ -47,18 +47,18 @@ public void process() {
} catch (FilterException e) {
throw new FirehoseConsumerFailedException(e);
} finally {
instrumentation.captureDurationSince(SOURCE_KAFKA_PARTITIONS_PROCESS_TIME_MILLISECONDS, beforeCall);
firehoseInstrumentation.captureDurationSince(SOURCE_KAFKA_PARTITIONS_PROCESS_TIME_MILLISECONDS, beforeCall);
}
}

private Future<List<Message>> scheduleTask(List<Message> messages) {
while (true) {
Future<List<Message>> scheduledTask = sinkPool.submitTask(messages);
if (scheduledTask == null) {
instrumentation.logInfo("The Queue is full");
firehoseInstrumentation.logInfo("The Queue is full");
sinkPool.fetchFinishedSinkTasks().forEach(consumerAndOffsetManager::setCommittable);
} else {
instrumentation.logInfo("Adding sink task");
firehoseInstrumentation.logInfo("Adding sink task");
return scheduledTask;
}
}
Expand All @@ -69,6 +69,6 @@ public void close() throws IOException {
consumerAndOffsetManager.close();
tracer.close();
sinkPool.close();
instrumentation.close();
firehoseInstrumentation.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.odpf.firehose.consumer;

import io.jaegertracing.Configuration;
import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.firehose.consumer.kafka.ConsumerAndOffsetManager;
import io.odpf.firehose.consumer.kafka.FirehoseKafkaConsumer;
import io.odpf.firehose.consumer.kafka.OffsetManager;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.SinkFactory;
import io.odpf.firehose.utils.KafkaUtils;
import io.odpf.firehose.config.AppConfig;
Expand All @@ -19,10 +21,8 @@
import io.odpf.firehose.filter.jexl.JexlFilter;
import io.odpf.firehose.filter.json.JsonFilter;
import io.odpf.firehose.filter.json.JsonFilterUtil;
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.metrics.StatsDReporter;
import io.odpf.firehose.sink.Sink;
import io.odpf.firehose.sink.log.KeyOrMessageParser;
import io.odpf.firehose.sink.common.KeyOrMessageParser;
import io.odpf.firehose.sinkdecorator.BackOff;
import io.odpf.firehose.sinkdecorator.BackOffProvider;
import io.odpf.firehose.error.ErrorHandler;
Expand Down Expand Up @@ -58,7 +58,7 @@ public class FirehoseConsumerFactory {
private final Map<String, String> config = System.getenv();
private final StatsDReporter statsDReporter;
private final StencilClient stencilClient;
private final Instrumentation instrumentation;
private final FirehoseInstrumentation firehoseInstrumentation;
private final KeyOrMessageParser parser;
private final OffsetManager offsetManager = new OffsetManager();

Expand All @@ -71,14 +71,14 @@ public class FirehoseConsumerFactory {
public FirehoseConsumerFactory(KafkaConsumerConfig kafkaConsumerConfig, StatsDReporter statsDReporter) {
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.statsDReporter = statsDReporter;
instrumentation = new Instrumentation(this.statsDReporter, FirehoseConsumerFactory.class);
firehoseInstrumentation = new FirehoseInstrumentation(this.statsDReporter, FirehoseConsumerFactory.class);

String additionalConsumerConfig = String.format(""
+ "\n\tEnable Async Commit: %s"
+ "\n\tCommit Only Current Partition: %s",
this.kafkaConsumerConfig.isSourceKafkaAsyncCommitEnable(),
this.kafkaConsumerConfig.isSourceKafkaCommitOnlyCurrentPartitionsEnable());
instrumentation.logDebug(additionalConsumerConfig);
firehoseInstrumentation.logDebug(additionalConsumerConfig);

String stencilUrl = this.kafkaConsumerConfig.getSchemaRegistryStencilUrls();
stencilClient = this.kafkaConsumerConfig.isSchemaRegistryStencilEnable()
Expand All @@ -88,25 +88,25 @@ public FirehoseConsumerFactory(KafkaConsumerConfig kafkaConsumerConfig, StatsDRe
}

private FirehoseFilter buildFilter(FilterConfig filterConfig) {
instrumentation.logInfo("Filter Engine: {}", filterConfig.getFilterEngine());
firehoseInstrumentation.logInfo("Filter Engine: {}", filterConfig.getFilterEngine());
Filter filter;
switch (filterConfig.getFilterEngine()) {
case JSON:
Instrumentation jsonFilterUtilInstrumentation = new Instrumentation(statsDReporter, JsonFilterUtil.class);
JsonFilterUtil.logConfigs(filterConfig, jsonFilterUtilInstrumentation);
JsonFilterUtil.validateConfigs(filterConfig, jsonFilterUtilInstrumentation);
filter = new JsonFilter(stencilClient, filterConfig, new Instrumentation(statsDReporter, JsonFilter.class));
FirehoseInstrumentation jsonFilterUtilFirehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, JsonFilterUtil.class);
JsonFilterUtil.logConfigs(filterConfig, jsonFilterUtilFirehoseInstrumentation);
JsonFilterUtil.validateConfigs(filterConfig, jsonFilterUtilFirehoseInstrumentation);
filter = new JsonFilter(stencilClient, filterConfig, new FirehoseInstrumentation(statsDReporter, JsonFilter.class));
break;
case JEXL:
filter = new JexlFilter(filterConfig, new Instrumentation(statsDReporter, JexlFilter.class));
filter = new JexlFilter(filterConfig, new FirehoseInstrumentation(statsDReporter, JexlFilter.class));
break;
case NO_OP:
filter = new NoOpFilter(new Instrumentation(statsDReporter, NoOpFilter.class));
filter = new NoOpFilter(new FirehoseInstrumentation(statsDReporter, NoOpFilter.class));
break;
default:
throw new IllegalArgumentException("Invalid filter engine type");
}
return new FirehoseFilter(filter, new Instrumentation(statsDReporter, FirehoseFilter.class));
return new FirehoseFilter(filter, new FirehoseInstrumentation(statsDReporter, FirehoseFilter.class));
}

/**
Expand All @@ -128,21 +128,21 @@ public FirehoseConsumer buildConsumer() {
sinkFactory.init();
if (kafkaConsumerConfig.getSourceKafkaConsumerMode().equals(KafkaConsumerMode.SYNC)) {
Sink sink = createSink(tracer, sinkFactory);
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(Collections.singletonList(sink), offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new Instrumentation(statsDReporter, ConsumerAndOffsetManager.class));
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(Collections.singletonList(sink), offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new FirehoseInstrumentation(statsDReporter, ConsumerAndOffsetManager.class));
return new FirehoseSyncConsumer(
sink,
firehoseTracer,
consumerAndOffsetManager,
firehoseFilter,
new Instrumentation(statsDReporter, FirehoseSyncConsumer.class));
new FirehoseInstrumentation(statsDReporter, FirehoseSyncConsumer.class));
} else {
SinkPoolConfig sinkPoolConfig = ConfigFactory.create(SinkPoolConfig.class, config);
int nThreads = sinkPoolConfig.getSinkPoolNumThreads();
List<Sink> sinks = new ArrayList<>(nThreads);
for (int ii = 0; ii < nThreads; ii++) {
sinks.add(createSink(tracer, sinkFactory));
}
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(sinks, offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new Instrumentation(statsDReporter, ConsumerAndOffsetManager.class));
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(sinks, offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new FirehoseInstrumentation(statsDReporter, ConsumerAndOffsetManager.class));
SinkPool sinkPool = new SinkPool(
new LinkedBlockingQueue<>(sinks),
Executors.newCachedThreadPool(),
Expand All @@ -152,7 +152,7 @@ public FirehoseConsumer buildConsumer() {
firehoseTracer,
consumerAndOffsetManager,
firehoseFilter,
new Instrumentation(statsDReporter, FirehoseAsyncConsumer.class));
new FirehoseInstrumentation(statsDReporter, FirehoseAsyncConsumer.class));
}
}

Expand All @@ -162,7 +162,7 @@ private Sink createSink(Tracer tracer, SinkFactory sinkFactory) {
Sink sinkWithFailHandler = new SinkWithFailHandler(baseSink, errorHandler);
Sink sinkWithRetry = withRetry(sinkWithFailHandler, errorHandler);
Sink sinWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler);
return new SinkFinal(sinWithDLQ, new Instrumentation(statsDReporter, SinkFinal.class));
return new SinkFinal(sinWithDLQ, new FirehoseInstrumentation(statsDReporter, SinkFinal.class));
}

public Sink withDlq(Sink sink, Tracer tracer, ErrorHandler errorHandler) {
Expand All @@ -178,7 +178,7 @@ public Sink withDlq(Sink sink, Tracer tracer, ErrorHandler errorHandler) {
backOffProvider,
dlqConfig,
errorHandler,
new Instrumentation(statsDReporter, SinkWithDlq.class));
new FirehoseInstrumentation(statsDReporter, SinkWithDlq.class));
}

/**
Expand All @@ -191,7 +191,7 @@ public Sink withDlq(Sink sink, Tracer tracer, ErrorHandler errorHandler) {
private Sink withRetry(Sink sink, ErrorHandler errorHandler) {
AppConfig appConfig = ConfigFactory.create(AppConfig.class, config);
BackOffProvider backOffProvider = getBackOffProvider();
return new SinkWithRetry(sink, backOffProvider, new Instrumentation(statsDReporter, SinkWithRetry.class), appConfig, parser, errorHandler);
return new SinkWithRetry(sink, backOffProvider, new FirehoseInstrumentation(statsDReporter, SinkWithRetry.class), appConfig, parser, errorHandler);
}

private BackOffProvider getBackOffProvider() {
Expand All @@ -200,7 +200,7 @@ private BackOffProvider getBackOffProvider() {
appConfig.getRetryExponentialBackoffInitialMs(),
appConfig.getRetryExponentialBackoffRate(),
appConfig.getRetryExponentialBackoffMaxMs(),
new Instrumentation(statsDReporter, ExponentialBackOffProvider.class),
new BackOff(new Instrumentation(statsDReporter, BackOff.class)));
new FirehoseInstrumentation(statsDReporter, ExponentialBackOffProvider.class),
new BackOff(new FirehoseInstrumentation(statsDReporter, BackOff.class)));
}
}
8 changes: 4 additions & 4 deletions src/main/java/io/odpf/firehose/consumer/FirehoseFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.odpf.firehose.filter.Filter;
import io.odpf.firehose.filter.FilterException;
import io.odpf.firehose.filter.FilteredMessages;
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.metrics.Metrics;
import lombok.AllArgsConstructor;

Expand All @@ -13,14 +13,14 @@
@AllArgsConstructor
public class FirehoseFilter {
private final Filter filter;
private final Instrumentation instrumentation;
private final FirehoseInstrumentation firehoseInstrumentation;

public FilteredMessages applyFilter(List<Message> messages) throws FilterException {
FilteredMessages filteredMessage = filter.filter(messages);
int filteredMessageCount = filteredMessage.sizeOfInvalidMessages();
if (filteredMessageCount > 0) {
instrumentation.captureFilteredMessageCount(filteredMessageCount);
instrumentation.captureGlobalMessageMetrics(Metrics.MessageScope.FILTERED, filteredMessageCount);
firehoseInstrumentation.captureFilteredMessageCount(filteredMessageCount);
firehoseInstrumentation.captureGlobalMessageMetrics(Metrics.MessageScope.FILTERED, filteredMessageCount);
}
return filteredMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.odpf.firehose.message.Message;
import io.odpf.firehose.filter.FilterException;
import io.odpf.firehose.filter.FilteredMessages;
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.Sink;
import io.odpf.firehose.tracer.SinkTracer;
import io.opentracing.Span;
Expand All @@ -27,7 +27,7 @@ public class FirehoseSyncConsumer implements FirehoseConsumer {
private final SinkTracer tracer;
private final ConsumerAndOffsetManager consumerAndOffsetManager;
private final FirehoseFilter firehoseFilter;
private final Instrumentation instrumentation;
private final FirehoseInstrumentation firehoseInstrumentation;

@Override
public void process() throws IOException {
Expand All @@ -44,20 +44,20 @@ public void process() throws IOException {
consumerAndOffsetManager.addOffsetsAndSetCommittable(filteredMessages.getValidMessages());
}
consumerAndOffsetManager.commit();
instrumentation.logInfo("Processed {} records in consumer", messages.size());
firehoseInstrumentation.logInfo("Processed {} records in consumer", messages.size());
tracer.finishTrace(spans);
} catch (FilterException e) {
throw new FirehoseConsumerFailedException(e);
} finally {
instrumentation.captureDurationSince(SOURCE_KAFKA_PARTITIONS_PROCESS_TIME_MILLISECONDS, beforeCall);
firehoseInstrumentation.captureDurationSince(SOURCE_KAFKA_PARTITIONS_PROCESS_TIME_MILLISECONDS, beforeCall);
}
}

@Override
public void close() throws IOException {
tracer.close();
consumerAndOffsetManager.close();
instrumentation.close();
firehoseInstrumentation.close();
sink.close();
}
}
Loading

0 comments on commit f914382

Please sign in to comment.