Skip to content

Commit

Permalink
chore: logging fix with bigquery optimisations (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
lavkesh authored Feb 14, 2022
1 parent 0aa5e20 commit fe596a6
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 85 deletions.
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ dependencies {
implementation group: 'org.influxdb', name: 'influxdb-java', version: '2.5'
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.4.0'
implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.10'
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.0.1'
implementation group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0'
implementation group: 'io.opentracing.contrib', name: 'opentracing-kafka-client', version: '0.1.4'
implementation group: 'io.jaegertracing', name: 'jaeger-client', version: '1.0.0'
implementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '6.3.1'
implementation(group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '6.3.1') {
exclude group: "log4j", module: "log4j"
}
implementation 'io.confluent:monitoring-interceptors:3.0.0'
implementation "io.grpc:grpc-all:1.18.0"
implementation group: 'org.jfrog.buildinfo', name: 'build-info-extractor', version: '2.6.3'
Expand Down Expand Up @@ -274,6 +277,7 @@ jacocoTestCoverageVerification {
'**/exception/**',
'**/serializer/**',
'**/cortexpb/**',
'**/SinkFactory**',
'**/Clock**',
'**/KafkaUtils**',
'**/ConsumerRebalancer**',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.odpf.firehose.consumer.kafka.ConsumerAndOffsetManager;
import io.odpf.firehose.consumer.kafka.FirehoseKafkaConsumer;
import io.odpf.firehose.consumer.kafka.OffsetManager;
import io.odpf.firehose.sink.SinkFactory;
import io.odpf.firehose.utils.KafkaUtils;
import io.odpf.firehose.config.AppConfig;
import io.odpf.firehose.config.DlqConfig;
Expand All @@ -13,7 +14,6 @@
import io.odpf.firehose.config.SinkPoolConfig;
import io.odpf.firehose.config.enums.KafkaConsumerMode;
import io.odpf.firehose.sink.SinkPool;
import io.odpf.firehose.exception.ConfigurationException;
import io.odpf.firehose.filter.Filter;
import io.odpf.firehose.filter.NoOpFilter;
import io.odpf.firehose.filter.jexl.JexlFilter;
Expand All @@ -22,18 +22,7 @@
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.metrics.StatsDReporter;
import io.odpf.firehose.sink.Sink;
import io.odpf.firehose.sink.bigquery.BigQuerySinkFactory;
import io.odpf.firehose.sink.elasticsearch.EsSinkFactory;
import io.odpf.firehose.sink.grpc.GrpcSinkFactory;
import io.odpf.firehose.sink.http.HttpSinkFactory;
import io.odpf.firehose.sink.influxdb.InfluxSinkFactory;
import io.odpf.firehose.sink.jdbc.JdbcSinkFactory;
import io.odpf.firehose.sink.log.KeyOrMessageParser;
import io.odpf.firehose.sink.log.LogSinkFactory;
import io.odpf.firehose.sink.mongodb.MongoSinkFactory;
import io.odpf.firehose.sink.blob.BlobSinkFactory;
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
import io.odpf.firehose.sink.redis.RedisSinkFactory;
import io.odpf.firehose.sinkdecorator.BackOff;
import io.odpf.firehose.sinkdecorator.BackOffProvider;
import io.odpf.firehose.error.ErrorHandler;
Expand Down Expand Up @@ -135,8 +124,10 @@ public FirehoseConsumer buildConsumer() {
FirehoseKafkaConsumer firehoseKafkaConsumer = KafkaUtils.createConsumer(kafkaConsumerConfig, config, statsDReporter, tracer);
SinkTracer firehoseTracer = new SinkTracer(tracer, kafkaConsumerConfig.getSinkType().name() + " SINK",
kafkaConsumerConfig.isTraceJaegarEnable());
SinkFactory sinkFactory = new SinkFactory(kafkaConsumerConfig, statsDReporter, stencilClient, offsetManager);
sinkFactory.init();
if (kafkaConsumerConfig.getSourceKafkaConsumerMode().equals(KafkaConsumerMode.SYNC)) {
Sink sink = createSink(tracer);
Sink sink = createSink(tracer, sinkFactory);
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(Collections.singletonList(sink), offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new Instrumentation(statsDReporter, ConsumerAndOffsetManager.class));
return new FirehoseSyncConsumer(
sink,
Expand All @@ -149,7 +140,7 @@ public FirehoseConsumer buildConsumer() {
int nThreads = sinkPoolConfig.getSinkPoolNumThreads();
List<Sink> sinks = new ArrayList<>(nThreads);
for (int ii = 0; ii < nThreads; ii++) {
sinks.add(createSink(tracer));
sinks.add(createSink(tracer, sinkFactory));
}
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(sinks, offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new Instrumentation(statsDReporter, ConsumerAndOffsetManager.class));
SinkPool sinkPool = new SinkPool(
Expand All @@ -165,45 +156,9 @@ public FirehoseConsumer buildConsumer() {
}
}

/**
* return the basic Sink implementation based on the config.
*
* @return Sink
*/
private Sink getSink() {
instrumentation.logInfo("Sink Type: {}", kafkaConsumerConfig.getSinkType().toString());
switch (kafkaConsumerConfig.getSinkType()) {
case JDBC:
return JdbcSinkFactory.create(config, statsDReporter, stencilClient);
case HTTP:
return HttpSinkFactory.create(config, statsDReporter, stencilClient);
case INFLUXDB:
return InfluxSinkFactory.create(config, statsDReporter, stencilClient);
case LOG:
return LogSinkFactory.create(config, statsDReporter, stencilClient);
case ELASTICSEARCH:
return EsSinkFactory.create(config, statsDReporter, stencilClient);
case REDIS:
return RedisSinkFactory.create(config, statsDReporter, stencilClient);
case GRPC:
return GrpcSinkFactory.create(config, statsDReporter, stencilClient);
case PROMETHEUS:
return PromSinkFactory.create(config, statsDReporter, stencilClient);
case BLOB:
return BlobSinkFactory.create(config, offsetManager, statsDReporter, stencilClient);
case BIGQUERY:
return BigQuerySinkFactory.create(config, statsDReporter);
case MONGODB:
return MongoSinkFactory.create(config, statsDReporter, stencilClient);
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");

}
}

private Sink createSink(Tracer tracer) {
private Sink createSink(Tracer tracer, SinkFactory sinkFactory) {
ErrorHandler errorHandler = new ErrorHandler(ConfigFactory.create(ErrorConfig.class, config));
Sink baseSink = getSink();
Sink baseSink = sinkFactory.getSink();
Sink sinkWithFailHandler = new SinkWithFailHandler(baseSink, errorHandler);
Sink sinkWithRetry = withRetry(sinkWithFailHandler, errorHandler);
Sink sinWithDLQ = withDlq(sinkWithRetry, tracer, errorHandler);
Expand Down
104 changes: 86 additions & 18 deletions src/main/java/io/odpf/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,97 @@
package io.odpf.firehose.sink;


import io.odpf.firehose.config.KafkaConsumerConfig;
import io.odpf.firehose.consumer.kafka.OffsetManager;
import io.odpf.firehose.exception.ConfigurationException;
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.metrics.StatsDReporter;
import io.odpf.firehose.sink.bigquery.BigQuerySinkFactory;
import io.odpf.firehose.sink.blob.BlobSinkFactory;
import io.odpf.firehose.sink.elasticsearch.EsSinkFactory;
import io.odpf.firehose.sink.grpc.GrpcSinkFactory;
import io.odpf.firehose.sink.http.HttpSinkFactory;
import io.odpf.firehose.sink.influxdb.InfluxSinkFactory;
import io.odpf.firehose.sink.jdbc.JdbcSinkFactory;
import io.odpf.firehose.sink.log.LogSinkFactory;
import io.odpf.firehose.sink.mongodb.MongoSinkFactory;
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
import io.odpf.firehose.sink.redis.RedisSinkFactory;
import io.odpf.stencil.client.StencilClient;

import java.util.Map;

/**
* Interface to create the sink.
*
* Any sink {@see Sink} can be created by a class implementing this interface.
* The consumer framework would reflectively instantiate this factory
* using the configurations supplied and invoke {@see #create(Map<String, String> configuration, StatsDClient client)}
* to obtain the sink implementation.
*
*/
public interface SinkFactory {
public class SinkFactory {
private final KafkaConsumerConfig kafkaConsumerConfig;
private final StatsDReporter statsDReporter;
private final Instrumentation instrumentation;
private final StencilClient stencilClient;
private final OffsetManager offsetManager;
private BigQuerySinkFactory bigQuerySinkFactory;
private final Map<String, String> config = System.getenv();

public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
StatsDReporter statsDReporter,
StencilClient stencilClient,
OffsetManager offsetManager) {
instrumentation = new Instrumentation(statsDReporter, SinkFactory.class);
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.statsDReporter = statsDReporter;
this.stencilClient = stencilClient;
this.offsetManager = offsetManager;
}

/**
* method to create the sink from configuration supplied.
*
* @param configuration key/value configuration supplied as a map
* @param client {@see StatsDClient}
* @param stencilClient {@see StencilClient}
* @return instance of sink to which messages consumed from kafka can be forwarded to. {@see Sink}
* Initialization method for all the sinks.
*/
Sink create(Map<String, String> configuration, StatsDReporter client, StencilClient stencilClient);
public void init() {
switch (this.kafkaConsumerConfig.getSinkType()) {
case JDBC:
case HTTP:
case INFLUXDB:
case LOG:
case ELASTICSEARCH:
case REDIS:
case GRPC:
case PROMETHEUS:
case BLOB:
case MONGODB:
return;
case BIGQUERY:
bigQuerySinkFactory = new BigQuerySinkFactory(config, statsDReporter);
bigQuerySinkFactory.init();
return;
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
}

public Sink getSink() {
instrumentation.logInfo("Sink Type: {}", kafkaConsumerConfig.getSinkType().toString());
switch (kafkaConsumerConfig.getSinkType()) {
case JDBC:
return JdbcSinkFactory.create(config, statsDReporter, stencilClient);
case HTTP:
return HttpSinkFactory.create(config, statsDReporter, stencilClient);
case INFLUXDB:
return InfluxSinkFactory.create(config, statsDReporter, stencilClient);
case LOG:
return LogSinkFactory.create(config, statsDReporter, stencilClient);
case ELASTICSEARCH:
return EsSinkFactory.create(config, statsDReporter, stencilClient);
case REDIS:
return RedisSinkFactory.create(config, statsDReporter, stencilClient);
case GRPC:
return GrpcSinkFactory.create(config, statsDReporter, stencilClient);
case PROMETHEUS:
return PromSinkFactory.create(config, statsDReporter, stencilClient);
case BLOB:
return BlobSinkFactory.create(config, offsetManager, statsDReporter, stencilClient);
case BIGQUERY:
return bigQuerySinkFactory.create();
case MONGODB:
return MongoSinkFactory.create(config, statsDReporter, stencilClient);
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,22 @@

public class BigQuerySinkFactory {

public static Sink create(Map<String, String> env, StatsDReporter statsDReporter) {
BigQuerySinkConfig sinkConfig = ConfigFactory.create(BigQuerySinkConfig.class, env);
private BigQueryClient bigQueryClient;
private MessageRecordConverterCache recordConverterWrapper;
private BigQueryRow rowCreator;
private final StatsDReporter statsDReporter;
private final Map<String, String> config;

public BigQuerySinkFactory(Map<String, String> env, StatsDReporter statsDReporter) {
this.config = env;
this.statsDReporter = statsDReporter;
}

public void init() {
BigQuerySinkConfig sinkConfig = ConfigFactory.create(BigQuerySinkConfig.class, config);
try {
BigQueryClient bigQueryClient = new BigQueryClient(sinkConfig, new Instrumentation(statsDReporter, BigQueryClient.class));
MessageRecordConverterCache recordConverterWrapper = new MessageRecordConverterCache();
this.bigQueryClient = new BigQueryClient(sinkConfig, new Instrumentation(statsDReporter, BigQueryClient.class));
this.recordConverterWrapper = new MessageRecordConverterCache();
StencilClient stencilClient;
ProtoUpdateListener protoUpdateListener = new ProtoUpdateListener(sinkConfig, bigQueryClient, recordConverterWrapper);
StencilConfig stencilConfig = StencilUtils.getStencilConfig(sinkConfig, statsDReporter.getClient(), protoUpdateListener);
Expand All @@ -39,20 +50,22 @@ public static Sink create(Map<String, String> env, StatsDReporter statsDReporter
Parser parser = stencilClient.getParser(sinkConfig.getInputSchemaProtoClass());
protoUpdateListener.setStencilParser(parser);
protoUpdateListener.onSchemaUpdate(stencilClient.getAll());
BigQueryRow rowCreator;
if (sinkConfig.isRowInsertIdEnabled()) {
rowCreator = new BigQueryRowWithInsertId();
this.rowCreator = new BigQueryRowWithInsertId();
} else {
rowCreator = new BigQueryRowWithoutInsertId();
this.rowCreator = new BigQueryRowWithoutInsertId();
}
return new BigQuerySink(
new Instrumentation(statsDReporter, BigQuerySink.class),
SinkType.BIGQUERY.name(),
bigQueryClient,
recordConverterWrapper,
rowCreator);
} catch (IOException e) {
throw new IllegalArgumentException("Exception occurred while creating sink", e);
}
}

public Sink create() {
return new BigQuerySink(
new Instrumentation(statsDReporter, BigQuerySink.class),
SinkType.BIGQUERY.name(),
bigQueryClient,
recordConverterWrapper,
rowCreator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Random;

public class BigQueryClient {
private final BigQuery bigquery;
Expand All @@ -34,6 +35,9 @@ public class BigQueryClient {
private final BigQuerySinkConfig bqConfig;
private final BQTableDefinition bqTableDefinition;
private final Instrumentation instrumentation;
private static final int TABLE_INFO_UPDATE_RETRIES = 10;
private static final int DEFAULT_SLEEP_RETRY = 10000;
private final Random random = new Random(System.currentTimeMillis());

public BigQueryClient(BigQuerySinkConfig bqConfig, Instrumentation instrumentation) throws IOException {
this(getBigQueryInstance(bqConfig), bqConfig, instrumentation);
Expand Down Expand Up @@ -72,7 +76,29 @@ public void upsertTable(List<Field> bqSchemaFields) throws BigQueryException {
TableInfo tableInfo = TableInfo.newBuilder(tableID, tableDefinition)
.setLabels(bqConfig.getTableLabels())
.build();
upsertDatasetAndTable(tableInfo);
upsertDatasetAndTableWithRetry(tableInfo);
}

private void upsertDatasetAndTableWithRetry(TableInfo info) {
for (int ii = 0; ii < TABLE_INFO_UPDATE_RETRIES; ii++) {
try {
upsertDatasetAndTable(info);
return;
} catch (BigQueryException e) {
instrumentation.logWarn(e.getMessage());
if (e.getMessage().contains("Exceeded rate limits")) {
try {
int sleepMillis = random.nextInt(DEFAULT_SLEEP_RETRY);
instrumentation.logInfo("Waiting for " + sleepMillis + " milliseconds");
Thread.sleep(sleepMillis);
} catch (InterruptedException interruptedException) {
instrumentation.captureNonFatalError(interruptedException, "Sleep interrupted");
}
} else {
throw e;
}
}
}
}

private void upsertDatasetAndTable(TableInfo tableInfo) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@
<root level="${LOG_LEVEL:-INFO}">
<appender-ref ref="STDOUT"/>
</root>
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
<resetJUL>true</resetJUL>
</contextListener>
<!-- Adding different log level To fix https://github.com/protocolbuffers/protobuf/issues/9478 -->
<logger name="com.google.protobuf.TextFormat" level="WARN"/>
</configuration>
Loading

0 comments on commit fe596a6

Please sign in to comment.