Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert: "feat: redis sink using depot (#193)" #204

Merged
merged 1 commit into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation group: 'io.odpf', name: 'depot', version: '0.3.3'
implementation group: 'io.odpf', name: 'depot', version: '0.2.1'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
85 changes: 72 additions & 13 deletions docs/docs/sinks/redis-sink.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,80 @@
# Redis Sink
# Redis

Redis Sink is implemented in Firehose using the Redis sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot).
A Redis sink Firehose \(`SINK_TYPE`=`redis`\) requires the following variables to be set along with Generic ones

### Data Types
Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List
- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping)
- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_list_data_field_name)
- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_key_value_data_field_name)
### `SINK_REDIS_URLS`

The `key` is picked up from a field in the message itself.
REDIS instance hostname/IP address followed by its port.

Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.
- Example value: `localhos:6379,localhost:6380`
- Type: `required`

### Configuration
### `SINK_REDIS_DATA_TYPE`

For Redis sink in Firehose we need to set first (`SINK_TYPE`=`redis`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Redis sink specific configs are mentioned in ODPF Depot repository. You can check out the Redis Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md)
To select whether you want to push your data as a HashSet or as a List.

- Example value: `Hashset`
- Type: `required`
- Default value: `List`

### Deployment Types
Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`.
### `SINK_REDIS_KEY_TEMPLATE`

The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key.

- Example value: `Service\_%%s,1`

This will take the value with index 1 from proto and create the Redis keys as per the template\

- Type: `required`

### `INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING`

This is the field that decides what all data will be stored in the HashSet for each message.

- Example value: `{"6":"customer_id", "2":"order_num"}`
- Type: `required (For Hashset)`

### `SINK_REDIS_LIST_DATA_PROTO_INDEX`

This field decides what all data will be stored in the List for each message.

- Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis list with the corresponding keyTemplate\

- Type: `required (For List)`

### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX`

This field decides what data will be stored in the value part of key-value pair

- Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\

- Type: `required (For KeyValue)`

### `SINK_REDIS_TTL_TYPE`

- Example value: `DURATION`
- Type: `optional`
- Default value: `DISABLE`
- Choice of Redis TTL type.It can be:\
- `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)\
- `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired

### `SINK_REDIS_TTL_VALUE`

Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type.

- Example value: `100000`
- Type: `optional`
- Default value: `0`

### `SINK_REDIS_DEPLOYMENT_TYPE`

The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types.

- Example value: `Standalone`
- Type: `required`
- Default value: `Standalone`
43 changes: 43 additions & 0 deletions src/main/java/io/odpf/firehose/config/RedisSinkConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.odpf.firehose.config;

import io.odpf.firehose.config.converter.RedisSinkDataTypeConverter;
import io.odpf.firehose.config.converter.RedisSinkTtlTypeConverter;
import io.odpf.firehose.config.converter.RedisSinkDeploymentTypeConverter;
import io.odpf.firehose.config.enums.RedisSinkDataType;
import io.odpf.firehose.config.enums.RedisSinkTtlType;
import io.odpf.firehose.config.enums.RedisSinkDeploymentType;

public interface RedisSinkConfig extends AppConfig {
@Key("SINK_REDIS_URLS")
String getSinkRedisUrls();

@Key("SINK_REDIS_KEY_TEMPLATE")
String getSinkRedisKeyTemplate();

@Key("SINK_REDIS_DATA_TYPE")
@DefaultValue("HASHSET")
@ConverterClass(RedisSinkDataTypeConverter.class)
RedisSinkDataType getSinkRedisDataType();

@Key("SINK_REDIS_LIST_DATA_PROTO_INDEX")
String getSinkRedisListDataProtoIndex();

@Key("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX")
String getSinkRedisKeyValuetDataProtoIndex();

@Key("SINK_REDIS_TTL_TYPE")
@DefaultValue("DISABLE")
@ConverterClass(RedisSinkTtlTypeConverter.class)
RedisSinkTtlType getSinkRedisTtlType();

@Key("SINK_REDIS_TTL_VALUE")
@DefaultValue("0")
long getSinkRedisTtlValue();

@Key("SINK_REDIS_DEPLOYMENT_TYPE")
@DefaultValue("Standalone")
@ConverterClass(RedisSinkDeploymentTypeConverter.class)
RedisSinkDeploymentType getSinkRedisDeploymentType();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.RedisSinkDataType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkDataTypeConverter implements Converter<RedisSinkDataType> {
@Override
public RedisSinkDataType convert(Method method, String input) {
return RedisSinkDataType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.RedisSinkDeploymentType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkDeploymentTypeConverter implements Converter<RedisSinkDeploymentType> {
@Override
public RedisSinkDeploymentType convert(Method method, String input) {
return RedisSinkDeploymentType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.odpf.firehose.config.converter;

import io.odpf.firehose.config.enums.RedisSinkTtlType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class RedisSinkTtlTypeConverter implements Converter<RedisSinkTtlType> {
@Override
public RedisSinkTtlType convert(Method method, String input) {
return RedisSinkTtlType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.odpf.firehose.config.enums;

public enum RedisSinkDataType {
LIST,
HASHSET,
KEYVALUE,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.odpf.firehose.config.enums;

public enum RedisSinkDeploymentType {
STANDALONE,
CLUSTER
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.odpf.firehose.config.enums;

public enum RedisSinkTtlType {
EXACT_TIME,
DURATION,
DISABLE
}
14 changes: 3 additions & 11 deletions src/main/java/io/odpf/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import io.odpf.depot.bigquery.BigQuerySink;
import io.odpf.depot.bigquery.BigQuerySinkFactory;
import io.odpf.depot.config.BigQuerySinkConfig;
import io.odpf.depot.config.RedisSinkConfig;
import io.odpf.depot.log.LogSink;
import io.odpf.depot.log.LogSinkFactory;
import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.depot.redis.RedisSink;
import io.odpf.depot.redis.RedisSinkFactory;
import io.odpf.firehose.config.KafkaConsumerConfig;
import io.odpf.firehose.config.enums.SinkType;
import io.odpf.firehose.consumer.kafka.OffsetManager;
Expand All @@ -23,6 +20,7 @@
import io.odpf.firehose.sink.jdbc.JdbcSinkFactory;
import io.odpf.firehose.sink.mongodb.MongoSinkFactory;
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
import io.odpf.firehose.sink.redis.RedisSinkFactory;
import io.odpf.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

Expand All @@ -36,7 +34,6 @@ public class SinkFactory {
private final OffsetManager offsetManager;
private BigQuerySinkFactory bigQuerySinkFactory;
private LogSinkFactory logSinkFactory;
private RedisSinkFactory redisSinkFactory;
private final Map<String, String> config;

public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
Expand All @@ -60,6 +57,7 @@ public void init() {
case HTTP:
case INFLUXDB:
case ELASTICSEARCH:
case REDIS:
case GRPC:
case PROMETHEUS:
case BLOB:
Expand All @@ -69,12 +67,6 @@ public void init() {
logSinkFactory = new LogSinkFactory(config, statsDReporter);
logSinkFactory.init();
return;
case REDIS:
redisSinkFactory = new RedisSinkFactory(
ConfigFactory.create(RedisSinkConfig.class, config),
statsDReporter);
redisSinkFactory.init();
return;
case BIGQUERY:
BigquerySinkUtils.addMetadataColumns(config);
bigQuerySinkFactory = new BigQuerySinkFactory(
Expand Down Expand Up @@ -103,7 +95,7 @@ public Sink getSink() {
case ELASTICSEARCH:
return EsSinkFactory.create(config, statsDReporter, stencilClient);
case REDIS:
return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), sinkType.name(), redisSinkFactory.create());
return RedisSinkFactory.create(config, statsDReporter, stencilClient);
case GRPC:
return GrpcSinkFactory.create(config, statsDReporter, stencilClient);
case PROMETHEUS:
Expand Down
57 changes: 57 additions & 0 deletions src/main/java/io/odpf/firehose/sink/redis/RedisSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.odpf.firehose.sink.redis;

import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.AbstractSink;
import io.odpf.firehose.sink.redis.client.RedisClient;
import io.odpf.firehose.sink.redis.exception.NoResponseException;

import java.util.List;

/**
* RedisSink allows messages consumed from kafka to be persisted to a redis.
* The related configurations for RedisSink can be found here: {@see io.odpf.firehose.config.RedisSinkConfig}
*/
public class RedisSink extends AbstractSink {

private RedisClient redisClient;

/**
* Instantiates a new Redis sink.
*
* @param firehoseInstrumentation the instrumentation
* @param sinkType the sink type
* @param redisClient the redis client
*/
public RedisSink(FirehoseInstrumentation firehoseInstrumentation, String sinkType, RedisClient redisClient) {
super(firehoseInstrumentation, sinkType);
this.redisClient = redisClient;
}

/**
* process messages before sending to redis.
*
* @param messages the messages
*/
@Override
protected void prepare(List<Message> messages) {
redisClient.prepare(messages);
}

/**
* Send data to redis.
*
* @return the list
* @throws NoResponseException the no response exception
*/
@Override
protected List<Message> execute() throws NoResponseException {
return redisClient.execute();
}

@Override
public void close() {
getFirehoseInstrumentation().logInfo("Redis connection closing");
redisClient.close();
}
}
51 changes: 51 additions & 0 deletions src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.odpf.firehose.sink.redis;


import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.firehose.config.RedisSinkConfig;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.AbstractSink;
import io.odpf.firehose.sink.redis.client.RedisClient;
import io.odpf.firehose.sink.redis.client.RedisClientFactory;
import io.odpf.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

import java.util.Map;

/**
* Factory class to create the RedisSink.
* <p>
* The firehose would reflectively instantiate this factory
* using the configurations supplied and invoke {@see #create(Map < String, String > configuration, StatsDClient statsDReporter, StencilClient client)}
* to obtain the RedisSink implementation.
*/
public class RedisSinkFactory {

/**
* Creates Redis sink.
*
* @param configuration the configuration
* @param statsDReporter the stats d reporter
* @param stencilClient the stencil client
* @return the abstract sink
*/
public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) {
RedisSinkConfig redisSinkConfig = ConfigFactory.create(RedisSinkConfig.class, configuration);
FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, RedisSinkFactory.class);
String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.type = %s"
+ "\n\tredis.list.data.proto.index = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d",
redisSinkConfig.getSinkRedisUrls(),
redisSinkConfig.getSinkRedisKeyTemplate(),
redisSinkConfig.getSinkRedisDataType().toString(),
redisSinkConfig.getSinkRedisListDataProtoIndex(),
redisSinkConfig.getSinkRedisTtlType().toString(),
redisSinkConfig.getSinkRedisTtlValue());
firehoseInstrumentation.logDebug(redisConfig);
firehoseInstrumentation.logInfo("Redis server type = {}", redisSinkConfig.getSinkRedisDeploymentType());

RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient);
RedisClient client = redisClientFactory.getClient();
firehoseInstrumentation.logInfo("Connection to redis established successfully");
return new RedisSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), "redis", client);
}
}
Loading