diff --git a/build.gradle b/build.gradle index 06f4b5b5b..55717b00e 100644 --- a/build.gradle +++ b/build.gradle @@ -101,7 +101,7 @@ dependencies { implementation 'com.google.cloud:google-cloud-storage:1.114.0' implementation 'com.google.cloud:google-cloud-bigquery:1.115.0' implementation 'org.apache.logging.log4j:log4j-core:2.17.1' - implementation group: 'io.odpf', name: 'depot', version: '0.3.3' + implementation group: 'io.odpf', name: 'depot', version: '0.2.1' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' testImplementation group: 'junit', name: 'junit', version: '4.11' diff --git a/docs/docs/sinks/redis-sink.md b/docs/docs/sinks/redis-sink.md index 5321874d6..d0ab553d7 100644 --- a/docs/docs/sinks/redis-sink.md +++ b/docs/docs/sinks/redis-sink.md @@ -1,21 +1,80 @@ -# Redis Sink +# Redis -Redis Sink is implemented in Firehose using the Redis sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot). +A Redis sink Firehose \(`SINK_TYPE`=`redis`\) requires the following variables to be set along with Generic ones -### Data Types -Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List -- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping) -- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_list_data_field_name) -- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_key_value_data_field_name) +### `SINK_REDIS_URLS` -The `key` is picked up from a field in the message itself. +REDIS instance hostname/IP address followed by its port. -Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now. +- Example value: `localhos:6379,localhost:6380` +- Type: `required` -### Configuration +### `SINK_REDIS_DATA_TYPE` -For Redis sink in Firehose we need to set first (`SINK_TYPE`=`redis`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Redis sink specific configs are mentioned in ODPF Depot repository. You can check out the Redis Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md) +To select whether you want to push your data as a HashSet or as a List. +- Example value: `Hashset` +- Type: `required` +- Default value: `List` -### Deployment Types -Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`. +### `SINK_REDIS_KEY_TEMPLATE` + +The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key. + +- Example value: `Service\_%%s,1` + + This will take the value with index 1 from proto and create the Redis keys as per the template\ + +- Type: `required` + +### `INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING` + +This is the field that decides what all data will be stored in the HashSet for each message. + +- Example value: `{"6":"customer_id", "2":"order_num"}` +- Type: `required (For Hashset)` + +### `SINK_REDIS_LIST_DATA_PROTO_INDEX` + +This field decides what all data will be stored in the List for each message. + +- Example value: `6` + + This will get the value of the field with index 6 in your proto and push that to the Redis list with the corresponding keyTemplate\ + +- Type: `required (For List)` + +### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX` + +This field decides what data will be stored in the value part of key-value pair + +- Example value: `6` + + This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\ + +- Type: `required (For KeyValue)` + +### `SINK_REDIS_TTL_TYPE` + +- Example value: `DURATION` +- Type: `optional` +- Default value: `DISABLE` +- Choice of Redis TTL type.It can be:\ + - `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)\ + - `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired + +### `SINK_REDIS_TTL_VALUE` + +Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type. + +- Example value: `100000` +- Type: `optional` +- Default value: `0` + +### `SINK_REDIS_DEPLOYMENT_TYPE` + +The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types. + +- Example value: `Standalone` +- Type: `required` +- Default value: `Standalone` diff --git a/src/main/java/io/odpf/firehose/config/RedisSinkConfig.java b/src/main/java/io/odpf/firehose/config/RedisSinkConfig.java new file mode 100644 index 000000000..7e6201984 --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/RedisSinkConfig.java @@ -0,0 +1,43 @@ +package io.odpf.firehose.config; + +import io.odpf.firehose.config.converter.RedisSinkDataTypeConverter; +import io.odpf.firehose.config.converter.RedisSinkTtlTypeConverter; +import io.odpf.firehose.config.converter.RedisSinkDeploymentTypeConverter; +import io.odpf.firehose.config.enums.RedisSinkDataType; +import io.odpf.firehose.config.enums.RedisSinkTtlType; +import io.odpf.firehose.config.enums.RedisSinkDeploymentType; + +public interface RedisSinkConfig extends AppConfig { + @Key("SINK_REDIS_URLS") + String getSinkRedisUrls(); + + @Key("SINK_REDIS_KEY_TEMPLATE") + String getSinkRedisKeyTemplate(); + + @Key("SINK_REDIS_DATA_TYPE") + @DefaultValue("HASHSET") + @ConverterClass(RedisSinkDataTypeConverter.class) + RedisSinkDataType getSinkRedisDataType(); + + @Key("SINK_REDIS_LIST_DATA_PROTO_INDEX") + String getSinkRedisListDataProtoIndex(); + + @Key("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX") + String getSinkRedisKeyValuetDataProtoIndex(); + + @Key("SINK_REDIS_TTL_TYPE") + @DefaultValue("DISABLE") + @ConverterClass(RedisSinkTtlTypeConverter.class) + RedisSinkTtlType getSinkRedisTtlType(); + + @Key("SINK_REDIS_TTL_VALUE") + @DefaultValue("0") + long getSinkRedisTtlValue(); + + @Key("SINK_REDIS_DEPLOYMENT_TYPE") + @DefaultValue("Standalone") + @ConverterClass(RedisSinkDeploymentTypeConverter.class) + RedisSinkDeploymentType getSinkRedisDeploymentType(); + + +} diff --git a/src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java b/src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java new file mode 100644 index 000000000..06b64889b --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java @@ -0,0 +1,13 @@ +package io.odpf.firehose.config.converter; + +import io.odpf.firehose.config.enums.RedisSinkDataType; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; + +public class RedisSinkDataTypeConverter implements Converter { + @Override + public RedisSinkDataType convert(Method method, String input) { + return RedisSinkDataType.valueOf(input.toUpperCase()); + } +} diff --git a/src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java b/src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java new file mode 100644 index 000000000..fcd476ac4 --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java @@ -0,0 +1,13 @@ +package io.odpf.firehose.config.converter; + +import io.odpf.firehose.config.enums.RedisSinkDeploymentType; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; + +public class RedisSinkDeploymentTypeConverter implements Converter { + @Override + public RedisSinkDeploymentType convert(Method method, String input) { + return RedisSinkDeploymentType.valueOf(input.toUpperCase()); + } +} diff --git a/src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java b/src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java new file mode 100644 index 000000000..8ae0af293 --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java @@ -0,0 +1,13 @@ +package io.odpf.firehose.config.converter; + +import io.odpf.firehose.config.enums.RedisSinkTtlType; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; + +public class RedisSinkTtlTypeConverter implements Converter { + @Override + public RedisSinkTtlType convert(Method method, String input) { + return RedisSinkTtlType.valueOf(input.toUpperCase()); + } +} diff --git a/src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java b/src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java new file mode 100644 index 000000000..c8fb593b3 --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java @@ -0,0 +1,7 @@ +package io.odpf.firehose.config.enums; + +public enum RedisSinkDataType { + LIST, + HASHSET, + KEYVALUE, +} diff --git a/src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java b/src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java new file mode 100644 index 000000000..b2f9448aa --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java @@ -0,0 +1,6 @@ +package io.odpf.firehose.config.enums; + +public enum RedisSinkDeploymentType { + STANDALONE, + CLUSTER +} diff --git a/src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java b/src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java new file mode 100644 index 000000000..0507ca9fd --- /dev/null +++ b/src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java @@ -0,0 +1,7 @@ +package io.odpf.firehose.config.enums; + +public enum RedisSinkTtlType { + EXACT_TIME, + DURATION, + DISABLE +} diff --git a/src/main/java/io/odpf/firehose/sink/SinkFactory.java b/src/main/java/io/odpf/firehose/sink/SinkFactory.java index 9ba7bfa45..00ee24e26 100644 --- a/src/main/java/io/odpf/firehose/sink/SinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/SinkFactory.java @@ -3,12 +3,9 @@ import io.odpf.depot.bigquery.BigQuerySink; import io.odpf.depot.bigquery.BigQuerySinkFactory; import io.odpf.depot.config.BigQuerySinkConfig; -import io.odpf.depot.config.RedisSinkConfig; import io.odpf.depot.log.LogSink; import io.odpf.depot.log.LogSinkFactory; import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.depot.redis.RedisSink; -import io.odpf.depot.redis.RedisSinkFactory; import io.odpf.firehose.config.KafkaConsumerConfig; import io.odpf.firehose.config.enums.SinkType; import io.odpf.firehose.consumer.kafka.OffsetManager; @@ -23,6 +20,7 @@ import io.odpf.firehose.sink.jdbc.JdbcSinkFactory; import io.odpf.firehose.sink.mongodb.MongoSinkFactory; import io.odpf.firehose.sink.prometheus.PromSinkFactory; +import io.odpf.firehose.sink.redis.RedisSinkFactory; import io.odpf.stencil.client.StencilClient; import org.aeonbits.owner.ConfigFactory; @@ -36,7 +34,6 @@ public class SinkFactory { private final OffsetManager offsetManager; private BigQuerySinkFactory bigQuerySinkFactory; private LogSinkFactory logSinkFactory; - private RedisSinkFactory redisSinkFactory; private final Map config; public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig, @@ -60,6 +57,7 @@ public void init() { case HTTP: case INFLUXDB: case ELASTICSEARCH: + case REDIS: case GRPC: case PROMETHEUS: case BLOB: @@ -69,12 +67,6 @@ public void init() { logSinkFactory = new LogSinkFactory(config, statsDReporter); logSinkFactory.init(); return; - case REDIS: - redisSinkFactory = new RedisSinkFactory( - ConfigFactory.create(RedisSinkConfig.class, config), - statsDReporter); - redisSinkFactory.init(); - return; case BIGQUERY: BigquerySinkUtils.addMetadataColumns(config); bigQuerySinkFactory = new BigQuerySinkFactory( @@ -103,7 +95,7 @@ public Sink getSink() { case ELASTICSEARCH: return EsSinkFactory.create(config, statsDReporter, stencilClient); case REDIS: - return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), sinkType.name(), redisSinkFactory.create()); + return RedisSinkFactory.create(config, statsDReporter, stencilClient); case GRPC: return GrpcSinkFactory.create(config, statsDReporter, stencilClient); case PROMETHEUS: diff --git a/src/main/java/io/odpf/firehose/sink/redis/RedisSink.java b/src/main/java/io/odpf/firehose/sink/redis/RedisSink.java new file mode 100644 index 000000000..808631c1a --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/RedisSink.java @@ -0,0 +1,57 @@ +package io.odpf.firehose.sink.redis; + +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.AbstractSink; +import io.odpf.firehose.sink.redis.client.RedisClient; +import io.odpf.firehose.sink.redis.exception.NoResponseException; + +import java.util.List; + +/** + * RedisSink allows messages consumed from kafka to be persisted to a redis. + * The related configurations for RedisSink can be found here: {@see io.odpf.firehose.config.RedisSinkConfig} + */ +public class RedisSink extends AbstractSink { + + private RedisClient redisClient; + + /** + * Instantiates a new Redis sink. + * + * @param firehoseInstrumentation the instrumentation + * @param sinkType the sink type + * @param redisClient the redis client + */ + public RedisSink(FirehoseInstrumentation firehoseInstrumentation, String sinkType, RedisClient redisClient) { + super(firehoseInstrumentation, sinkType); + this.redisClient = redisClient; + } + + /** + * process messages before sending to redis. + * + * @param messages the messages + */ + @Override + protected void prepare(List messages) { + redisClient.prepare(messages); + } + + /** + * Send data to redis. + * + * @return the list + * @throws NoResponseException the no response exception + */ + @Override + protected List execute() throws NoResponseException { + return redisClient.execute(); + } + + @Override + public void close() { + getFirehoseInstrumentation().logInfo("Redis connection closing"); + redisClient.close(); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java b/src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java new file mode 100644 index 000000000..741fe695d --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java @@ -0,0 +1,51 @@ +package io.odpf.firehose.sink.redis; + + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.AbstractSink; +import io.odpf.firehose.sink.redis.client.RedisClient; +import io.odpf.firehose.sink.redis.client.RedisClientFactory; +import io.odpf.stencil.client.StencilClient; +import org.aeonbits.owner.ConfigFactory; + +import java.util.Map; + +/** + * Factory class to create the RedisSink. + *

+ * The firehose would reflectively instantiate this factory + * using the configurations supplied and invoke {@see #create(Map < String, String > configuration, StatsDClient statsDReporter, StencilClient client)} + * to obtain the RedisSink implementation. + */ +public class RedisSinkFactory { + + /** + * Creates Redis sink. + * + * @param configuration the configuration + * @param statsDReporter the stats d reporter + * @param stencilClient the stencil client + * @return the abstract sink + */ + public static AbstractSink create(Map configuration, StatsDReporter statsDReporter, StencilClient stencilClient) { + RedisSinkConfig redisSinkConfig = ConfigFactory.create(RedisSinkConfig.class, configuration); + FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, RedisSinkFactory.class); + String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.type = %s" + + "\n\tredis.list.data.proto.index = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d", + redisSinkConfig.getSinkRedisUrls(), + redisSinkConfig.getSinkRedisKeyTemplate(), + redisSinkConfig.getSinkRedisDataType().toString(), + redisSinkConfig.getSinkRedisListDataProtoIndex(), + redisSinkConfig.getSinkRedisTtlType().toString(), + redisSinkConfig.getSinkRedisTtlValue()); + firehoseInstrumentation.logDebug(redisConfig); + firehoseInstrumentation.logInfo("Redis server type = {}", redisSinkConfig.getSinkRedisDeploymentType()); + + RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); + RedisClient client = redisClientFactory.getClient(); + firehoseInstrumentation.logInfo("Connection to redis established successfully"); + return new RedisSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), "redis", client); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClient.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClient.java new file mode 100644 index 000000000..66b663734 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClient.java @@ -0,0 +1,29 @@ +package io.odpf.firehose.sink.redis.client; + +import io.odpf.firehose.message.Message; + +import java.util.List; + +/** + * Redis client interface to be used in RedisSink. + */ +public interface RedisClient { + /** + * Process messages before sending. + * + * @param messages the messages + */ + void prepare(List messages); + + /** + * Sends the processed messages to redis. + * + * @return list of messages + */ + List execute(); + + /** + * Close the client. + */ + void close(); +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClientFactory.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClientFactory.java new file mode 100644 index 000000000..7d3fac23e --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClientFactory.java @@ -0,0 +1,79 @@ +package io.odpf.firehose.sink.redis.client; + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.config.enums.RedisSinkDeploymentType; +import io.odpf.firehose.exception.ConfigurationException; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.proto.ProtoToFieldMapper; +import io.odpf.firehose.sink.redis.parsers.RedisParser; +import io.odpf.firehose.sink.redis.parsers.RedisParserFactory; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import io.odpf.firehose.sink.redis.ttl.RedisTTLFactory; +import io.odpf.stencil.client.StencilClient; +import io.odpf.stencil.Parser; +import org.apache.commons.lang.StringUtils; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; + +import java.util.HashSet; + +/** + * Redis client factory. + */ +public class RedisClientFactory { + + private static final String DELIMITER = ","; + private StatsDReporter statsDReporter; + private RedisSinkConfig redisSinkConfig; + private StencilClient stencilClient; + + /** + * Instantiates a new Redis client factory. + * + * @param statsDReporter the statsd reporter + * @param redisSinkConfig the redis sink config + * @param stencilClient the stencil client + */ + public RedisClientFactory(StatsDReporter statsDReporter, RedisSinkConfig redisSinkConfig, StencilClient stencilClient) { + this.statsDReporter = statsDReporter; + this.redisSinkConfig = redisSinkConfig; + this.stencilClient = stencilClient; + } + + public RedisClient getClient() { + Parser protoParser = stencilClient.getParser(redisSinkConfig.getInputSchemaProtoClass()); + ProtoToFieldMapper protoToFieldMapper = new ProtoToFieldMapper(protoParser, redisSinkConfig.getInputSchemaProtoToColumnMapping()); + RedisParser redisParser = RedisParserFactory.getParser(protoToFieldMapper, protoParser, redisSinkConfig, statsDReporter); + RedisSinkDeploymentType redisSinkDeploymentType = redisSinkConfig.getSinkRedisDeploymentType(); + RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); + return RedisSinkDeploymentType.CLUSTER.equals(redisSinkDeploymentType) + ? getRedisClusterClient(redisParser, redisTTL) + : getRedisStandaloneClient(redisParser, redisTTL); + } + + private RedisStandaloneClient getRedisStandaloneClient(RedisParser redisParser, RedisTtl redisTTL) { + Jedis jedis = null; + try { + jedis = new Jedis(HostAndPort.parseString(StringUtils.trim(redisSinkConfig.getSinkRedisUrls()))); + } catch (IllegalArgumentException e) { + throw new ConfigurationException(String.format("Invalid url for redis standalone: %s", redisSinkConfig.getSinkRedisUrls())); + } + return new RedisStandaloneClient(new FirehoseInstrumentation(statsDReporter, RedisStandaloneClient.class), redisParser, redisTTL, jedis); + } + + private RedisClusterClient getRedisClusterClient(RedisParser redisParser, RedisTtl redisTTL) { + String[] redisUrls = redisSinkConfig.getSinkRedisUrls().split(DELIMITER); + HashSet nodes = new HashSet<>(); + try { + for (String redisUrl : redisUrls) { + nodes.add(HostAndPort.parseString(StringUtils.trim(redisUrl))); + } + } catch (IllegalArgumentException e) { + throw new ConfigurationException(String.format("Invalid url(s) for redis cluster: %s", redisSinkConfig.getSinkRedisUrls())); + } + JedisCluster jedisCluster = new JedisCluster(nodes); + return new RedisClusterClient(new FirehoseInstrumentation(statsDReporter, RedisClusterClient.class), redisParser, redisTTL, jedisCluster); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClusterClient.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClusterClient.java new file mode 100644 index 000000000..f72beb535 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClusterClient.java @@ -0,0 +1,55 @@ +package io.odpf.firehose.sink.redis.client; + +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.parsers.RedisParser; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import redis.clients.jedis.JedisCluster; + +import java.util.ArrayList; +import java.util.List; + +/** + * Redis cluster client. + */ +public class RedisClusterClient implements RedisClient { + + private FirehoseInstrumentation firehoseInstrumentation; + private RedisParser redisParser; + private RedisTtl redisTTL; + private JedisCluster jedisCluster; + private List redisDataEntries; + + /** + * Instantiates a new Redis cluster client. + * + * @param firehoseInstrumentation the instrumentation + * @param redisParser the redis parser + * @param redisTTL the redis ttl + * @param jedisCluster the jedis cluster + */ + public RedisClusterClient(FirehoseInstrumentation firehoseInstrumentation, RedisParser redisParser, RedisTtl redisTTL, JedisCluster jedisCluster) { + this.firehoseInstrumentation = firehoseInstrumentation; + this.redisParser = redisParser; + this.redisTTL = redisTTL; + this.jedisCluster = jedisCluster; + } + + @Override + public void prepare(List messages) { + redisDataEntries = redisParser.parse(messages); + } + + @Override + public List execute() { + redisDataEntries.forEach(redisDataEntry -> redisDataEntry.pushMessage(jedisCluster, redisTTL)); + return new ArrayList<>(); + } + + @Override + public void close() { + firehoseInstrumentation.logInfo("Closing Jedis client"); + jedisCluster.close(); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClient.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClient.java new file mode 100644 index 000000000..ba6498467 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClient.java @@ -0,0 +1,67 @@ +package io.odpf.firehose.sink.redis.client; + +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.exception.NoResponseException; +import io.odpf.firehose.sink.redis.parsers.RedisParser; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; + +import java.util.ArrayList; +import java.util.List; + +/** + * Redis standalone client. + */ +public class RedisStandaloneClient implements RedisClient { + + private FirehoseInstrumentation firehoseInstrumentation; + private RedisParser redisParser; + private RedisTtl redisTTL; + private Jedis jedis; + private Pipeline jedisPipelined; + + /** + * Instantiates a new Redis standalone client. + * + * @param firehoseInstrumentation the instrumentation + * @param redisParser the redis parser + * @param redisTTL the redis ttl + * @param jedis the jedis + */ + public RedisStandaloneClient(FirehoseInstrumentation firehoseInstrumentation, RedisParser redisParser, RedisTtl redisTTL, Jedis jedis) { + this.firehoseInstrumentation = firehoseInstrumentation; + this.redisParser = redisParser; + this.redisTTL = redisTTL; + this.jedis = jedis; + } + + @Override + public void prepare(List messages) { + List redisDataEntries = redisParser.parse(messages); + jedisPipelined = jedis.pipelined(); + + jedisPipelined.multi(); + redisDataEntries.forEach(redisDataEntry -> redisDataEntry.pushMessage(jedisPipelined, redisTTL)); + } + + @Override + public List execute() { + Response> responses = jedisPipelined.exec(); + firehoseInstrumentation.logDebug("jedis responses: {}", responses); + jedisPipelined.sync(); + if (responses.get() == null || responses.get().isEmpty()) { + throw new NoResponseException(); + } + return new ArrayList<>(); + } + + @Override + public void close() { + firehoseInstrumentation.logInfo("Closing Jedis client"); + jedis.close(); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisDataEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisDataEntry.java new file mode 100644 index 000000000..45e06d2eb --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisDataEntry.java @@ -0,0 +1,27 @@ +package io.odpf.firehose.sink.redis.dataentry; + +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +/** + * The interface Redis data entry. + */ +public interface RedisDataEntry { + + /** + * Push messages to jedis pipeline. + * + * @param jedisPipelined the jedis pipelined + * @param redisTTL the redis ttl + */ + void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL); + + /** + * Push message to jedis cluster. + * + * @param jedisCluster the jedis cluster + * @param redisTTL the redis ttl + */ + void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL); +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntry.java new file mode 100644 index 000000000..fac75fc9e --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntry.java @@ -0,0 +1,35 @@ +package io.odpf.firehose.sink.redis.dataentry; + +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import lombok.AllArgsConstructor; +import lombok.Getter; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +/** + * Class for Redis Hash set entry. + */ +@AllArgsConstructor +@Getter +public class RedisHashSetFieldEntry implements RedisDataEntry { + + private String key; + private String field; + private String value; + private FirehoseInstrumentation firehoseInstrumentation; + + @Override + public void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL) { + getFirehoseInstrumentation().logDebug("key: {}, field: {}, value: {}", getKey(), getField(), getValue()); + jedisPipelined.hset(getKey(), getField(), getValue()); + redisTTL.setTtl(jedisPipelined, getKey()); + } + + @Override + public void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL) { + getFirehoseInstrumentation().logDebug("key: {}, field: {}, value: {}", getKey(), getField(), getValue()); + jedisCluster.hset(getKey(), getField(), getValue()); + redisTTL.setTtl(jedisCluster, getKey()); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntry.java new file mode 100644 index 000000000..18d23f059 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntry.java @@ -0,0 +1,48 @@ +package io.odpf.firehose.sink.redis.dataentry; + +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +@AllArgsConstructor +@Getter +@EqualsAndHashCode(exclude = "firehoseInstrumentation") +public class RedisKeyValueEntry implements RedisDataEntry { + + private String key; + private String value; + private FirehoseInstrumentation firehoseInstrumentation; + + @Override + public void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL) { + firehoseInstrumentation.logDebug("key: {}, value: {}", key, value); + jedisPipelined.set(key, value); + redisTTL.setTtl(jedisPipelined, key); + } + + @Override + public void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL) { + firehoseInstrumentation.logDebug("key: {}, value: {}", key, value); + jedisCluster.set(key, value); + redisTTL.setTtl(jedisCluster, key); + + } + + @Override + public String toString() { + return "RedisKeyValueEntry{" + + + "key='" + + key + + '\'' + + + ", value='" + value + + '\'' + + + '}'; + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntry.java new file mode 100644 index 000000000..c6c9ee163 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntry.java @@ -0,0 +1,33 @@ +package io.odpf.firehose.sink.redis.dataentry; + +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import lombok.AllArgsConstructor; +import lombok.Getter; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +/** + * Class for Redis Hash set entry. + */ +@AllArgsConstructor +@Getter +public class RedisListEntry implements RedisDataEntry { + private String key; + private String value; + private FirehoseInstrumentation firehoseInstrumentation; + + @Override + public void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL) { + getFirehoseInstrumentation().logDebug("key: {}, value: {}", getKey(), getValue()); + jedisPipelined.lpush(getKey(), getValue()); + redisTTL.setTtl(jedisPipelined, getKey()); + } + + @Override + public void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL) { + getFirehoseInstrumentation().logDebug("key: {}, value: {}", getKey(), getValue()); + jedisCluster.lpush(getKey(), getValue()); + redisTTL.setTtl(jedisCluster, getKey()); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/exception/NoResponseException.java b/src/main/java/io/odpf/firehose/sink/redis/exception/NoResponseException.java new file mode 100644 index 000000000..0fcfd1672 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/exception/NoResponseException.java @@ -0,0 +1,16 @@ +package io.odpf.firehose.sink.redis.exception; + +/** + * NoResponseException + *

+ * Exception to raise if there is no responds from redisClient. + */ +public class NoResponseException extends RuntimeException { + + /** + * Instantiates a new No response exception. + */ + public NoResponseException() { + super("Redis Pipeline error: no responds received"); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParser.java new file mode 100644 index 000000000..3478e922f --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParser.java @@ -0,0 +1,48 @@ +package io.odpf.firehose.sink.redis.parsers; + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.proto.ProtoToFieldMapper; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; +import com.google.protobuf.DynamicMessage; +import io.odpf.stencil.Parser; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Redis hash set parser. + */ +public class RedisHashSetParser extends RedisParser { + private ProtoToFieldMapper protoToFieldMapper; + private RedisSinkConfig redisSinkConfig; + private StatsDReporter statsDReporter; + + /** + * Instantiates a new Redis hash set parser. + * @param protoToFieldMapper the proto to field mapper + * @param protoParser the proto parser + * @param redisSinkConfig the redis sink config + * @param statsDReporter the statsd reporter + */ + public RedisHashSetParser(ProtoToFieldMapper protoToFieldMapper, Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { + super(protoParser, redisSinkConfig); + this.protoToFieldMapper = protoToFieldMapper; + this.redisSinkConfig = redisSinkConfig; + this.statsDReporter = statsDReporter; + } + + @Override + public List parse(Message message) { + DynamicMessage parsedMessage = parseEsbMessage(message); + String redisKey = parseTemplate(parsedMessage, redisSinkConfig.getSinkRedisKeyTemplate()); + List messageEntries = new ArrayList<>(); + Map protoToFieldMap = protoToFieldMapper.getFields(getPayload(message)); + protoToFieldMap.forEach((key, value) -> messageEntries.add(new RedisHashSetFieldEntry(redisKey, parseTemplate(parsedMessage, key), String.valueOf(value), new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)))); + return messageEntries; + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParser.java new file mode 100644 index 000000000..f5447fe78 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParser.java @@ -0,0 +1,37 @@ +package io.odpf.firehose.sink.redis.parsers; + +import com.google.protobuf.DynamicMessage; +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisKeyValueEntry; +import io.odpf.stencil.Parser; + +import java.util.Collections; +import java.util.List; + +public class RedisKeyValueParser extends RedisParser { + private RedisSinkConfig redisSinkConfig; + private StatsDReporter statsDReporter; + + public RedisKeyValueParser(Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { + super(protoParser, redisSinkConfig); + this.redisSinkConfig = redisSinkConfig; + this.statsDReporter = statsDReporter; + } + + @Override + public List parse(Message message) { + DynamicMessage parsedMessage = parseEsbMessage(message); + String redisKey = parseTemplate(parsedMessage, redisSinkConfig.getSinkRedisKeyTemplate()); + String protoIndex = redisSinkConfig.getSinkRedisKeyValuetDataProtoIndex(); + if (protoIndex == null) { + throw new IllegalArgumentException("Please provide SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX in key value sink"); + } + FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, RedisKeyValueEntry.class); + RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(redisKey, getDataByFieldNumber(parsedMessage, protoIndex).toString(), firehoseInstrumentation); + return Collections.singletonList(redisKeyValueEntry); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisListParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisListParser.java new file mode 100644 index 000000000..760c26384 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisListParser.java @@ -0,0 +1,48 @@ +package io.odpf.firehose.sink.redis.parsers; + + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; +import com.google.protobuf.DynamicMessage; +import io.odpf.stencil.Parser; + +import java.util.ArrayList; +import java.util.List; + +/** + * Redis list parser. + */ +public class RedisListParser extends RedisParser { + private RedisSinkConfig redisSinkConfig; + private StatsDReporter statsDReporter; + + /** + * Instantiates a new Redis list parser. + * + * @param protoParser the proto parser + * @param redisSinkConfig the redis sink config + * @param statsDReporter the stats d reporter + */ + public RedisListParser(Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { + super(protoParser, redisSinkConfig); + this.redisSinkConfig = redisSinkConfig; + this.statsDReporter = statsDReporter; + } + + @Override + public List parse(Message message) { + DynamicMessage parsedMessage = parseEsbMessage(message); + String redisKey = parseTemplate(parsedMessage, redisSinkConfig.getSinkRedisKeyTemplate()); + String protoIndex = redisSinkConfig.getSinkRedisListDataProtoIndex(); + if (protoIndex == null) { + throw new IllegalArgumentException("Please provide SINK_REDIS_LIST_DATA_PROTO_INDEX in list sink"); + } + List messageEntries = new ArrayList<>(); + messageEntries.add(new RedisListEntry(redisKey, getDataByFieldNumber(parsedMessage, protoIndex).toString(), new FirehoseInstrumentation(statsDReporter, RedisListEntry.class))); + return messageEntries; + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParser.java new file mode 100644 index 000000000..b027c46b2 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParser.java @@ -0,0 +1,128 @@ +package io.odpf.firehose.sink.redis.parsers; + + +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import io.odpf.stencil.Parser; +import lombok.AllArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.errors.InvalidConfigurationException; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Convert kafka messages to RedisDataEntry. + */ +@AllArgsConstructor +public abstract class RedisParser { + + private Parser protoParser; + private RedisSinkConfig redisSinkConfig; + + public abstract List parse(Message message); + + public List parse(List messages) { + return messages + .stream() + .map(this::parse) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + /** + * Parse esb message to protobuf. + * + * @param message parsed message + * @return Parsed Proto object + */ + DynamicMessage parseEsbMessage(Message message) { + DynamicMessage parsedMessage; + try { + parsedMessage = protoParser.parse(getPayload(message)); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException("Unable to parse data when reading Key", e); + } + return parsedMessage; + } + + /** + * Parse template string. + * + * @param data the data + * @param template the template + * @return parsed template + */ + String parseTemplate(DynamicMessage data, String template) { + if (StringUtils.isEmpty(template)) { + throw new IllegalArgumentException("Template '" + template + "' is invalid"); + } + String[] templateStrings = template.split(","); + if (templateStrings.length == 0) { + throw new InvalidConfigurationException("Empty key configuration: '" + template + "'"); + } + templateStrings = Arrays + .stream(templateStrings) + .map(String::trim) + .toArray(String[]::new); + String templatePattern = templateStrings[0]; + String templateVariables = StringUtils.join(Arrays.copyOfRange(templateStrings, 1, templateStrings.length), ","); + String renderedTemplate = renderStringTemplate(data, templatePattern, templateVariables); + return StringUtils.isEmpty(templateVariables) + ? templatePattern + : renderedTemplate; + } + + private String renderStringTemplate(DynamicMessage parsedMessage, String pattern, String patternVariables) { + if (StringUtils.isEmpty(patternVariables)) { + return pattern; + } + List patternVariableFieldNumbers = Arrays.asList(patternVariables.split(",")); + Object[] patternVariableData = patternVariableFieldNumbers + .stream() + .map(fieldNumber -> getDataByFieldNumber(parsedMessage, fieldNumber)) + .toArray(); + return String.format(pattern, patternVariableData); + } + + /** + * Gets data by field number. + * + * @param parsedMessage the parsed message + * @param fieldNumber the field number + * @return Data object + */ + Object getDataByFieldNumber(DynamicMessage parsedMessage, String fieldNumber) { + int fieldNumberInt; + try { + fieldNumberInt = Integer.parseInt(fieldNumber); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid Proto Index"); + } + Descriptors.FieldDescriptor fieldDescriptor = parsedMessage.getDescriptorForType().findFieldByNumber(fieldNumberInt); + if (fieldDescriptor == null) { + throw new IllegalArgumentException(String.format("Descriptor not found for index: %s", fieldNumber)); + } + return parsedMessage.getField(fieldDescriptor); + } + + /** + * Get payload bytes. + * + * @param message the message + * @return binary payload + */ + byte[] getPayload(Message message) { + if (redisSinkConfig.getKafkaRecordParserMode().equals("key")) { + return message.getLogKey(); + } else { + return message.getLogMessage(); + } + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactory.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactory.java new file mode 100644 index 000000000..edfb62e79 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactory.java @@ -0,0 +1,35 @@ +package io.odpf.firehose.sink.redis.parsers; + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.proto.ProtoToFieldMapper; +import io.odpf.stencil.Parser; + +/** + * Redis parser factory. + */ +public class RedisParserFactory { + + /** + * Gets parser. + * + * @param protoToFieldMapper the proto to field mapper + * @param protoParser the proto parser + * @param redisSinkConfig the redis sink config + * @param statsDReporter the statsd reporter + * @return RedisParser + */ + public static RedisParser getParser(ProtoToFieldMapper protoToFieldMapper, Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { + + switch (redisSinkConfig.getSinkRedisDataType()) { + case LIST: + return new RedisListParser(protoParser, redisSinkConfig, statsDReporter); + case HASHSET: + return new RedisHashSetParser(protoToFieldMapper, protoParser, redisSinkConfig, statsDReporter); + case KEYVALUE: + return new RedisKeyValueParser(protoParser, redisSinkConfig, statsDReporter); + default: + return null; + } + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/DurationTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/DurationTtl.java new file mode 100644 index 000000000..45d3a7e06 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/ttl/DurationTtl.java @@ -0,0 +1,23 @@ +package io.odpf.firehose.sink.redis.ttl; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + + +@AllArgsConstructor +@Getter +public class DurationTtl implements RedisTtl { + private int ttlInSeconds; + + @Override + public void setTtl(Pipeline jedisPipelined, String key) { + jedisPipelined.expire(key, ttlInSeconds); + } + + @Override + public void setTtl(JedisCluster jedisCluster, String key) { + jedisCluster.expire(key, ttlInSeconds); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTtl.java new file mode 100644 index 000000000..86e91ed84 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTtl.java @@ -0,0 +1,23 @@ +package io.odpf.firehose.sink.redis.ttl; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + + +@AllArgsConstructor +@Getter +public class ExactTimeTtl implements RedisTtl { + private long unixTime; + + @Override + public void setTtl(Pipeline jedisPipelined, String key) { + jedisPipelined.expireAt(key, unixTime); + } + + @Override + public void setTtl(JedisCluster jedisCluster, String key) { + jedisCluster.expireAt(key, unixTime); + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/NoRedisTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/NoRedisTtl.java new file mode 100644 index 000000000..ef36800f4 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/ttl/NoRedisTtl.java @@ -0,0 +1,15 @@ +package io.odpf.firehose.sink.redis.ttl; + +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +public class NoRedisTtl implements RedisTtl { + @Override + public void setTtl(Pipeline jedisPipelined, String key) { + } + + @Override + public void setTtl(JedisCluster jedisCluster, String key) { + + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTTLFactory.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTTLFactory.java new file mode 100644 index 000000000..2ca0c70fc --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTTLFactory.java @@ -0,0 +1,22 @@ +package io.odpf.firehose.sink.redis.ttl; + +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.exception.ConfigurationException; + +public class RedisTTLFactory { + + public static RedisTtl getTTl(RedisSinkConfig redisSinkConfig) { + long redisTTLValue = redisSinkConfig.getSinkRedisTtlValue(); + if (redisTTLValue < 0) { + throw new ConfigurationException("Provide a positive TTL value"); + } + switch (redisSinkConfig.getSinkRedisTtlType()) { + case EXACT_TIME: + return new ExactTimeTtl(redisTTLValue); + case DURATION: + return new DurationTtl((int) redisTTLValue); + default: + return new NoRedisTtl(); + } + } +} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTtl.java new file mode 100644 index 000000000..e994e6e57 --- /dev/null +++ b/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTtl.java @@ -0,0 +1,13 @@ +package io.odpf.firehose.sink.redis.ttl; + +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +/** + * Interface for RedisTTL. + */ +public interface RedisTtl { + void setTtl(Pipeline jedisPipelined, String key); + + void setTtl(JedisCluster jedisCluster, String key); +} diff --git a/src/test/java/io/odpf/firehose/config/RedisSinkDataTypeConverterTest.java b/src/test/java/io/odpf/firehose/config/RedisSinkDataTypeConverterTest.java new file mode 100644 index 000000000..03b026109 --- /dev/null +++ b/src/test/java/io/odpf/firehose/config/RedisSinkDataTypeConverterTest.java @@ -0,0 +1,47 @@ +package io.odpf.firehose.config; + +import io.odpf.firehose.config.converter.RedisSinkDataTypeConverter; +import io.odpf.firehose.config.enums.RedisSinkDataType; +import org.gradle.internal.impldep.org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class RedisSinkDataTypeConverterTest { + + private RedisSinkDataTypeConverter redisSinkDataTypeConverter; + + @Before + public void setUp() { + redisSinkDataTypeConverter = new RedisSinkDataTypeConverter(); + } + + @Test + public void shouldReturnListSinkTypeFromLowerCaseInput() { + RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "list"); + Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.LIST)); + } + + @Test + public void shouldReturnListSinkTypeFromUpperCaseInput() { + RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "LIST"); + Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.LIST)); + } + + @Test + public void shouldReturnListSinkTypeFromMixedCaseInput() { + RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "LiSt"); + Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.LIST)); + } + + @Test + public void shouldReturnHashSetSinkTypeFromInput() { + RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "hashset"); + Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.HASHSET)); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnIllegalArgument() { + redisSinkDataTypeConverter.convert(null, ""); + } + +} diff --git a/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java b/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java index 21c7c0303..a9e0637d7 100644 --- a/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java +++ b/src/test/java/io/odpf/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java @@ -2,10 +2,10 @@ import io.odpf.depot.error.ErrorInfo; import io.odpf.depot.error.ErrorType; -import io.odpf.firehose.exception.DeserializerException; import io.odpf.firehose.message.Message; -import io.odpf.firehose.sink.common.blobstorage.BlobStorage; +import io.odpf.firehose.exception.DeserializerException; import io.odpf.firehose.sink.common.blobstorage.BlobStorageException; +import io.odpf.firehose.sink.common.blobstorage.BlobStorage; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -16,10 +16,13 @@ import java.io.IOException; import java.time.Instant; import java.util.Arrays; +import java.util.Base64; import java.util.Comparator; import java.util.List; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class BlobStorageDlqWriterTest { @@ -47,12 +50,14 @@ public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOExcept List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); + String key = Base64.getEncoder().encodeToString("123".getBytes()); + String message = Base64.getEncoder().encodeToString("abc".getBytes()); verify(blobStorage).store(contains("booking/2020-01-02"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes())); + eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}").getBytes())); verify(blobStorage).store(contains("booking/2020-01-01"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes())); + eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}").getBytes())); } @Test @@ -68,12 +73,14 @@ public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, Bl List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); + String key = Base64.getEncoder().encodeToString("123".getBytes()); + String message = Base64.getEncoder().encodeToString("abc".getBytes()); verify(blobStorage).store(contains("booking/2020-01-02"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes())); + eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"SINK_UNKNOWN_ERROR\"}").getBytes())); verify(blobStorage).store(contains("booking/2020-01-01"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception null, ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes())); + eq(("{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"" + key + "\",\"value\":\"" + message + "\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"SINK_UNKNOWN_ERROR\"}").getBytes())); } @Test diff --git a/src/test/java/io/odpf/firehose/sink/redis/RedisSinkFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/RedisSinkFactoryTest.java new file mode 100644 index 000000000..a42bc7a14 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/RedisSinkFactoryTest.java @@ -0,0 +1,41 @@ +package io.odpf.firehose.sink.redis; + + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.sink.AbstractSink; +import io.odpf.stencil.client.StencilClient; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class RedisSinkFactoryTest { + private Map configuration; + + @Mock + private StatsDReporter statsDReporter; + + @Mock + private StencilClient stencilClient; + + @Before + public void setUp() { + configuration = new HashMap<>(); + MockitoAnnotations.initMocks(this); + } + + @Test + public void shouldCreateRedisSink() { + configuration.put("SINK_REDIS_URLS", "localhost:6379"); + configuration.put("SINK_REDIS_KEY_TEMPLATE", "test_%%s,6"); + configuration.put("SINK_REDIS_LIST_DATA_PROTO_INDEX", "3"); + + AbstractSink sink = RedisSinkFactory.create(configuration, statsDReporter, stencilClient); + assertEquals(RedisSink.class, sink.getClass()); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/RedisSinkTest.java b/src/test/java/io/odpf/firehose/sink/redis/RedisSinkTest.java new file mode 100644 index 000000000..6b22704a7 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/RedisSinkTest.java @@ -0,0 +1,98 @@ +package io.odpf.firehose.sink.redis; + +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.client.RedisClient; +import io.odpf.firehose.sink.redis.exception.NoResponseException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Instant; +import java.util.ArrayList; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RedisSinkTest { + @Mock + private RedisClient redisClient; + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + private RedisSink redis; + + @Before + public void setup() { + when(firehoseInstrumentation.startExecution()).thenReturn(Instant.now()); + redis = new RedisSink(firehoseInstrumentation, "redis", redisClient); + } + + @Test + public void shouldInvokeExecuteOnTheClient() { + redis.execute(); + + verify(redisClient).execute(); + } + + @Test + public void shouldInvokePrepareOnTheClient() { + ArrayList messages = new ArrayList<>(); + + redis.prepare(messages); + + verify(redisClient).prepare(messages); + } + + @Test + public void shouldInvokeCloseOnTheClient() { + redis.close(); + + verify(redisClient).close(); + } + + @Test + public void shouldLogWhenClosingConnection() { + redis.close(); + + verify(firehoseInstrumentation, times(1)).logInfo("Redis connection closing"); + } + + @Test + public void sendsMetricsForSuccessMessages() { + ArrayList messages = new ArrayList<>(); + + redis.pushMessage(messages); + + verify(firehoseInstrumentation, times(1)).capturePreExecutionLatencies(messages); + verify(firehoseInstrumentation, times(1)).startExecution(); + verify(firehoseInstrumentation, times(1)).logInfo("Preparing {} messages", messages.size()); + verify(firehoseInstrumentation, times(1)).captureSinkExecutionTelemetry(any(), any()); + InOrder inOrder = inOrder(firehoseInstrumentation); + inOrder.verify(firehoseInstrumentation).logInfo("Preparing {} messages", messages.size()); + inOrder.verify(firehoseInstrumentation).capturePreExecutionLatencies(messages); + inOrder.verify(firehoseInstrumentation).startExecution(); + inOrder.verify(firehoseInstrumentation).captureSinkExecutionTelemetry(any(), any()); + } + + @Test + public void sendsMetricsForFailedMessages() { + when(redisClient.execute()).thenThrow(new NoResponseException()); + ArrayList messages = new ArrayList<>(); + + redis.pushMessage(messages); + + verify(firehoseInstrumentation, times(1)).capturePreExecutionLatencies(messages); + verify(firehoseInstrumentation, times(1)).startExecution(); + verify(firehoseInstrumentation, times(1)).logInfo("Preparing {} messages", messages.size()); + verify(firehoseInstrumentation, times(1)).captureSinkExecutionTelemetry(any(), any()); + InOrder inOrder = inOrder(firehoseInstrumentation); + inOrder.verify(firehoseInstrumentation).logInfo("Preparing {} messages", messages.size()); + inOrder.verify(firehoseInstrumentation).capturePreExecutionLatencies(messages); + inOrder.verify(firehoseInstrumentation).startExecution(); + inOrder.verify(firehoseInstrumentation).captureSinkExecutionTelemetry(any(), any()); + } + + +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/client/RedisClientFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/client/RedisClientFactoryTest.java new file mode 100644 index 000000000..8ba8c2ad2 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/client/RedisClientFactoryTest.java @@ -0,0 +1,118 @@ +package io.odpf.firehose.sink.redis.client; + + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.config.enums.RedisSinkDeploymentType; +import io.odpf.firehose.config.enums.RedisSinkDataType; +import io.odpf.firehose.config.enums.RedisSinkTtlType; +import io.odpf.firehose.exception.ConfigurationException; +import io.odpf.stencil.client.StencilClient; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RedisClientFactoryTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Mock + private RedisSinkConfig redisSinkConfig; + + @Mock + private StencilClient stencilClient; + + @Mock + private StatsDReporter statsDReporter; + + @Test + public void shouldGetStandaloneClient() { + when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.STANDALONE); + when(redisSinkConfig.getSinkRedisUrls()).thenReturn("0.0.0.0:0"); + + RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); + + RedisClient client = redisClientFactory.getClient(); + + Assert.assertEquals(RedisStandaloneClient.class, client.getClass()); + } + + @Test + public void shouldGetStandaloneClientWhenURLHasSpaces() { + when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.STANDALONE); + when(redisSinkConfig.getSinkRedisUrls()).thenReturn(" 0.0.0.0:0 "); + RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); + + RedisClient client = redisClientFactory.getClient(); + + Assert.assertEquals(RedisStandaloneClient.class, client.getClass()); + } + + @Test + public void shouldGetClusterClient() { + when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.CLUSTER); + when(redisSinkConfig.getSinkRedisUrls()).thenReturn("0.0.0.0:0, 1.1.1.1:1"); + RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); + + RedisClient client = redisClientFactory.getClient(); + + Assert.assertEquals(RedisClusterClient.class, client.getClass()); + } + + @Test + public void shouldGetClusterClientWhenURLHasSpaces() { + when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.CLUSTER); + when(redisSinkConfig.getSinkRedisUrls()).thenReturn(" 0.0.0.0:0, 1.1.1.1:1 "); + RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); + + RedisClient client = redisClientFactory.getClient(); + + Assert.assertEquals(RedisClusterClient.class, client.getClass()); + } + + @Test + public void shouldThrowExceptionWhenUrlIsInvalidForCluster() { + expectedException.expect(ConfigurationException.class); + expectedException.expectMessage("Invalid url(s) for redis cluster: localhost:6379,localhost:6378,localhost"); + + when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.CLUSTER); + when(redisSinkConfig.getSinkRedisUrls()).thenReturn("localhost:6379,localhost:6378,localhost"); + + RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); + + redisClientFactory.getClient(); + } + + @Test + public void shouldThrowExceptionWhenUrlIsInvalidForStandalone() { + expectedException.expect(ConfigurationException.class); + expectedException.expectMessage("Invalid url for redis standalone: localhost"); + + when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.STANDALONE); + when(redisSinkConfig.getSinkRedisUrls()).thenReturn("localhost"); + + RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); + + redisClientFactory.getClient(); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/client/RedisClusterClientTest.java b/src/test/java/io/odpf/firehose/sink/redis/client/RedisClusterClientTest.java new file mode 100644 index 000000000..75ea4e909 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/client/RedisClusterClientTest.java @@ -0,0 +1,114 @@ +package io.odpf.firehose.sink.redis.client; + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; +import io.odpf.firehose.sink.redis.parsers.RedisParser; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; +import redis.clients.jedis.JedisCluster; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RedisClusterClientTest { + @Mock + private StatsDReporter statsDReporter; + + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + + private final RedisHashSetFieldEntry firstRedisSetEntry = new RedisHashSetFieldEntry("key1", "field1", "value1", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); + private final RedisHashSetFieldEntry secondRedisSetEntry = new RedisHashSetFieldEntry("key2", "field2", "value2", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); + private final RedisListEntry firstRedisListEntry = new RedisListEntry("key1", "value1", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); + private final RedisListEntry secondRedisListEntry = new RedisListEntry("key2", "value2", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); + @Mock + private JedisCluster jedisCluster; + + @Mock + private RedisParser redisParser; + + @Mock + private RedisTtl redisTTL; + private List messages; + private RedisClusterClient redisClusterClient; + private ArrayList redisDataEntries; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + messages = Arrays.asList(new Message(new byte[0], new byte[0], "topic", 0, 100), + new Message(new byte[0], new byte[0], "topic", 0, 100)); + + redisClusterClient = new RedisClusterClient(firehoseInstrumentation, redisParser, redisTTL, jedisCluster); + + redisDataEntries = new ArrayList<>(); + + when(redisParser.parse(messages)).thenReturn(redisDataEntries); + } + + @Test + public void shouldParseEsbMessagesWhenPreparing() { + redisClusterClient.prepare(messages); + + verify(redisParser).parse(messages); + } + + @Test + public void shouldSendAllListDataWhenExecuting() { + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + + redisClusterClient.prepare(messages); + redisClusterClient.execute(); + + verify(jedisCluster).lpush(firstRedisListEntry.getKey(), firstRedisListEntry.getValue()); + verify(jedisCluster).lpush(secondRedisListEntry.getKey(), secondRedisListEntry.getValue()); + } + + @Test + public void shouldSendAllSetDataWhenExecuting() { + populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); + + redisClusterClient.prepare(messages); + redisClusterClient.execute(); + + verify(jedisCluster).hset(firstRedisSetEntry.getKey(), firstRedisSetEntry.getField(), firstRedisListEntry.getValue()); + verify(jedisCluster).hset(secondRedisSetEntry.getKey(), secondRedisSetEntry.getField(), secondRedisListEntry.getValue()); + } + + @Test + public void shouldReturnEmptyArrayAfterExecuting() { + populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); + + redisClusterClient.prepare(messages); + List retryElements = redisClusterClient.execute(); + + Assert.assertEquals(0, retryElements.size()); + } + + @Test + public void shouldCloseTheJedisClient() { + redisClusterClient.close(); + + verify(firehoseInstrumentation, times(1)).logInfo("Closing Jedis client"); + verify(jedisCluster).close(); + } + + + private void populateRedisDataEntry(RedisDataEntry... redisData) { + redisDataEntries.addAll(Arrays.asList(redisData)); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClientTest.java b/src/test/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClientTest.java new file mode 100644 index 000000000..8bdc67c78 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClientTest.java @@ -0,0 +1,194 @@ +package io.odpf.firehose.sink.redis.client; + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.exception.DeserializerException; +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; +import io.odpf.firehose.sink.redis.exception.NoResponseException; +import io.odpf.firehose.sink.redis.parsers.RedisParser; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RedisStandaloneClientTest { + @Mock + private StatsDReporter statsDReporter; + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + + private final RedisHashSetFieldEntry firstRedisSetEntry = new RedisHashSetFieldEntry("key1", "field1", "value1", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); + private final RedisHashSetFieldEntry secondRedisSetEntry = new RedisHashSetFieldEntry("key2", "field2", "value2", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); + private final RedisListEntry firstRedisListEntry = new RedisListEntry("key1", "value1", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); + private final RedisListEntry secondRedisListEntry = new RedisListEntry("key2", "value2", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private RedisClient redisClient; + private List messages; + private List redisDataEntries; + @Mock + private RedisParser redisMessageParser; + + @Mock + private RedisTtl redisTTL; + + @Mock + private Jedis jedis; + + @Mock + private Pipeline jedisPipeline; + + @Mock + private Response> responses; + + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + messages = Arrays.asList(new Message(new byte[0], new byte[0], "topic", 0, 100), + new Message(new byte[0], new byte[0], "topic", 0, 100)); + + redisClient = new RedisStandaloneClient(firehoseInstrumentation, redisMessageParser, redisTTL, jedis); + + redisDataEntries = new ArrayList<>(); + + when(jedis.pipelined()).thenReturn(jedisPipeline); + when(redisMessageParser.parse(messages)).thenReturn(redisDataEntries); + } + + @Test + public void pushesDataEntryForListInATransaction() throws DeserializerException { + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + + redisClient.prepare(messages); + + verify(jedisPipeline, times(1)).multi(); + verify(jedisPipeline).lpush(firstRedisListEntry.getKey(), firstRedisListEntry.getValue()); + verify(jedisPipeline).lpush(secondRedisListEntry.getKey(), secondRedisListEntry.getValue()); + } + + @Test + public void setsTTLForListItemsInATransaction() throws DeserializerException { + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + + redisClient.prepare(messages); + + verify(redisTTL).setTtl(jedisPipeline, firstRedisListEntry.getKey()); + verify(redisTTL).setTtl(jedisPipeline, secondRedisListEntry.getKey()); + } + + @Test + public void pushesDataEntryForSetInATransaction() throws DeserializerException { + populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); + + redisClient.prepare(messages); + + verify(jedisPipeline, times(1)).multi(); + verify(jedisPipeline).hset(firstRedisSetEntry.getKey(), firstRedisSetEntry.getField(), firstRedisSetEntry.getValue()); + verify(jedisPipeline).hset(secondRedisSetEntry.getKey(), secondRedisSetEntry.getField(), secondRedisSetEntry.getValue()); + } + + @Test + public void setsTTLForSetItemsInATransaction() throws DeserializerException { + populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); + + redisClient.prepare(messages); + + verify(redisTTL).setTtl(jedisPipeline, firstRedisSetEntry.getKey()); + verify(redisTTL).setTtl(jedisPipeline, secondRedisSetEntry.getKey()); + } + + @Test + public void shouldCompleteTransactionInExec() { + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + when(jedisPipeline.exec()).thenReturn(responses); + when(responses.get()).thenReturn(Collections.singletonList("MOCK_LIST_ITEM")); + + redisClient.prepare(messages); + redisClient.execute(); + + verify(jedisPipeline).exec(); + verify(firehoseInstrumentation, times(1)).logDebug("jedis responses: {}", responses); + } + + @Test + public void shouldWaitForResponseInExec() { + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + when(jedisPipeline.exec()).thenReturn(responses); + when(responses.get()).thenReturn(Collections.singletonList("MOCK_LIST_ITEM")); + + redisClient.prepare(messages); + redisClient.execute(); + + verify(jedisPipeline).sync(); + } + + @Test + public void shouldThrowExceptionWhenResponseIsNullInExec() { + expectedException.expect(NoResponseException.class); + + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + when(jedisPipeline.exec()).thenReturn(responses); + when(responses.get()).thenReturn(null); + + redisClient.prepare(messages); + redisClient.execute(); + } + + @Test + public void shouldThrowExceptionWhenResponseIsEmptyInExec() { + expectedException.expect(NoResponseException.class); + + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + when(jedisPipeline.exec()).thenReturn(responses); + when(responses.get()).thenReturn(new ArrayList<>()); + + redisClient.prepare(messages); + redisClient.execute(); + } + + @Test + public void shouldReturnEmptyArrayInExec() { + populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); + when(jedisPipeline.exec()).thenReturn(responses); + when(responses.get()).thenReturn(Collections.singletonList("MOCK_LIST_ITEM")); + + redisClient.prepare(messages); + List elementsToRetry = redisClient.execute(); + + Assert.assertEquals(0, elementsToRetry.size()); + } + + @Test + public void shouldCloseTheClient() { + redisClient.close(); + + verify(firehoseInstrumentation, times(1)).logInfo("Closing Jedis client"); + verify(jedis, times(1)).close(); + } + + + private void populateRedisDataEntry(RedisDataEntry... redisData) { + redisDataEntries.addAll(Arrays.asList(redisData)); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntryTest.java b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntryTest.java new file mode 100644 index 000000000..c2fb58ad0 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntryTest.java @@ -0,0 +1,105 @@ +package io.odpf.firehose.sink.redis.dataentry; + +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.ttl.DurationTtl; +import io.odpf.firehose.sink.redis.ttl.ExactTimeTtl; +import io.odpf.firehose.sink.redis.ttl.NoRedisTtl; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RedisHashSetFieldEntryTest { + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + + @Mock + private Pipeline pipeline; + + @Mock + private JedisCluster jedisCluster; + + private RedisTtl redisTTL; + private RedisHashSetFieldEntry redisHashSetFieldEntry; + private InOrder inOrderPipeline; + private InOrder inOrderJedis; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + redisTTL = new NoRedisTtl(); + redisHashSetFieldEntry = new RedisHashSetFieldEntry("test-key", "test-field", "test-value", firehoseInstrumentation); + inOrderPipeline = Mockito.inOrder(pipeline); + inOrderJedis = Mockito.inOrder(jedisCluster); + } + + @Test + public void shouldIOnlyPushDataWithoutTTLByDefaultForPipeline() { + redisHashSetFieldEntry.pushMessage(pipeline, redisTTL); + + verify(pipeline, times(1)).hset("test-key", "test-field", "test-value"); + verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); + verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); + } + + @Test + public void shouldSetProperTTLForExactTimeForPipeline() { + redisTTL = new ExactTimeTtl(1000L); + redisHashSetFieldEntry.pushMessage(pipeline, redisTTL); + + inOrderPipeline.verify(pipeline, times(1)).hset("test-key", "test-field", "test-value"); + inOrderPipeline.verify(pipeline, times(1)).expireAt("test-key", 1000L); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); + } + + @Test + public void shouldSetProperTTLForDurationForPipeline() { + redisTTL = new DurationTtl(1000); + redisHashSetFieldEntry.pushMessage(pipeline, redisTTL); + + inOrderPipeline.verify(pipeline, times(1)).hset("test-key", "test-field", "test-value"); + inOrderPipeline.verify(pipeline, times(1)).expire("test-key", 1000); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); + } + + @Test + public void shouldIOnlyPushDataWithoutTTLByDefaultForCluster() { + redisHashSetFieldEntry.pushMessage(jedisCluster, redisTTL); + + verify(jedisCluster, times(1)).hset("test-key", "test-field", "test-value"); + verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); + verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); + } + + @Test + public void shouldSetProperTTLForExactTimeForCluster() { + redisTTL = new ExactTimeTtl(1000L); + redisHashSetFieldEntry.pushMessage(jedisCluster, redisTTL); + + inOrderJedis.verify(jedisCluster, times(1)).hset("test-key", "test-field", "test-value"); + inOrderJedis.verify(jedisCluster, times(1)).expireAt("test-key", 1000L); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); + } + + @Test + public void shouldSetProperTTLForDuration() { + redisTTL = new DurationTtl(1000); + redisHashSetFieldEntry.pushMessage(jedisCluster, redisTTL); + + inOrderJedis.verify(jedisCluster, times(1)).hset("test-key", "test-field", "test-value"); + inOrderJedis.verify(jedisCluster, times(1)).expire("test-key", 1000); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntryTest.java b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntryTest.java new file mode 100644 index 000000000..31f0f2a46 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntryTest.java @@ -0,0 +1,100 @@ +package io.odpf.firehose.sink.redis.dataentry; + +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.ttl.DurationTtl; +import io.odpf.firehose.sink.redis.ttl.NoRedisTtl; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +import static org.mockito.Mockito.*; + +public class RedisKeyValueEntryTest { + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + + @Mock + private Pipeline pipeline; + + @Mock + private JedisCluster jedisCluster; + + private InOrder inOrderPipeline; + private InOrder inOrderJedis; + + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + inOrderPipeline = Mockito.inOrder(pipeline); + inOrderJedis = Mockito.inOrder(jedisCluster); + + } + + @Test + public void pushMessageWithNoTtl() { + String key = "key"; + String value = "value"; + RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); + redisKeyValueEntry.pushMessage(pipeline, new NoRedisTtl()); + inOrderPipeline.verify(pipeline, times(1)).set(key, value); + inOrderPipeline.verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); + + } + + @Test + public void pushMessageWithTtl() { + String key = "key"; + String value = "value"; + RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); + redisKeyValueEntry.pushMessage(pipeline, new DurationTtl(100)); + inOrderPipeline.verify(pipeline, times(1)).set(key, value); + inOrderPipeline.verify(pipeline, times(1)).expire(key, 100); + } + + @Test + public void pushMessageVerifyInstrumentation() { + String key = "this-key"; + String value = "john"; + RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); + redisKeyValueEntry.pushMessage(pipeline, new DurationTtl(100)); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", key, value); + } + + + @Test + public void pushMessageWithNoTtlUsingJedisCluster() { + String key = "key"; + String value = "value"; + RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); + redisKeyValueEntry.pushMessage(jedisCluster, new NoRedisTtl()); + inOrderJedis.verify(jedisCluster, times(1)).set(key, value); + inOrderJedis.verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); + + } + + @Test + public void pushMessageWithTtlUsingJedisCluster() { + String key = "key"; + String value = "value"; + RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); + redisKeyValueEntry.pushMessage(jedisCluster, new DurationTtl(100)); + inOrderJedis.verify(jedisCluster, times(1)).set(key, value); + inOrderJedis.verify(jedisCluster, times(1)).expire(key, 100); + } + + @Test + public void pushMessageVerifyInstrumentationUsingJedisCluster() { + String key = "this-key"; + String value = "john"; + RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); + redisKeyValueEntry.pushMessage(jedisCluster, new DurationTtl(100)); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", key, value); + } + +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntryTest.java b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntryTest.java new file mode 100644 index 000000000..d4583f633 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntryTest.java @@ -0,0 +1,94 @@ +package io.odpf.firehose.sink.redis.dataentry; + +import io.odpf.firehose.metrics.FirehoseInstrumentation; +import io.odpf.firehose.sink.redis.ttl.DurationTtl; +import io.odpf.firehose.sink.redis.ttl.ExactTimeTtl; +import io.odpf.firehose.sink.redis.ttl.NoRedisTtl; +import io.odpf.firehose.sink.redis.ttl.RedisTtl; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RedisListEntryTest { + + @Mock + private FirehoseInstrumentation firehoseInstrumentation; + + @Mock + private Pipeline pipeline; + + @Mock + private JedisCluster jedisCluster; + + private RedisTtl redisTTL; + private RedisListEntry redisListEntry; + + @Before + public void setup() { + redisTTL = new NoRedisTtl(); + redisListEntry = new RedisListEntry("test-key", "test-value", firehoseInstrumentation); + } + + @Test + public void shouldIOnlyPushDataWithoutTTLByDefaultForPipeline() { + redisListEntry.pushMessage(pipeline, redisTTL); + + verify(pipeline, times(1)).lpush("test-key", "test-value"); + verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); + verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); + } + + @Test + public void shouldSetProperTTLForExactTimeForPipeline() { + redisTTL = new ExactTimeTtl(1000L); + redisListEntry.pushMessage(pipeline, redisTTL); + + verify(pipeline, times(1)).expireAt("test-key", 1000L); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); + } + + @Test + public void shouldSetProperTTLForDurationForPipeline() { + redisTTL = new DurationTtl(1000); + redisListEntry.pushMessage(pipeline, redisTTL); + + verify(pipeline, times(1)).expire("test-key", 1000); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); + } + + @Test + public void shouldIOnlyPushDataWithoutTTLByDefaultForCluster() { + redisListEntry.pushMessage(jedisCluster, redisTTL); + + verify(jedisCluster, times(1)).lpush("test-key", "test-value"); + verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); + verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); + } + + @Test + public void shouldSetProperTTLForExactTimeForCluster() { + redisTTL = new ExactTimeTtl(1000L); + redisListEntry.pushMessage(jedisCluster, redisTTL); + + verify(jedisCluster, times(1)).expireAt("test-key", 1000L); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); + } + + @Test + public void shouldSetProperTTLForDurationForCluster() { + redisTTL = new DurationTtl(1000); + redisListEntry.pushMessage(jedisCluster, redisTTL); + + verify(jedisCluster, times(1)).expire("test-key", 1000); + verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParserTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParserTest.java new file mode 100644 index 000000000..407425813 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParserTest.java @@ -0,0 +1,336 @@ +package io.odpf.firehose.sink.redis.parsers; + + + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.config.enums.RedisSinkDataType; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.consumer.TestKey; +import io.odpf.firehose.consumer.TestMessage; +import io.odpf.firehose.consumer.TestBookingLogMessage; +import io.odpf.firehose.consumer.TestNestedRepeatedMessage; +import io.odpf.firehose.proto.ProtoToFieldMapper; +import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; +import io.odpf.stencil.client.ClassLoadStencilClient; +import io.odpf.stencil.Parser; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.IllegalFormatConversionException; +import java.util.Properties; +import java.util.UnknownFormatConversionException; + +import static junit.framework.TestCase.assertEquals; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RedisHashSetParserTest { + + private final long bookingCustomerTotalFare = 2000L; + private final float bookingAmountPaidByCash = 12.3F; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Mock + private RedisSinkConfig redisSinkConfig; + + @Mock + private StatsDReporter statsDReporter; + + private Message message; + private Parser testKeyProtoParser; + private Parser testMessageProtoParser; + private ClassLoadStencilClient stencilClient; + private Message bookingMessage; + private Parser bookingMessageProtoParser; + private String bookingOrderNumber = "booking-order-1"; + + @Before + public void setUp() throws Exception { + + TestKey testKey = TestKey.newBuilder().setOrderNumber("ORDER-1-FROM-KEY").build(); + TestBookingLogMessage testBookingLogMessage = TestBookingLogMessage.newBuilder().setOrderNumber(bookingOrderNumber).setCustomerTotalFareWithoutSurge(bookingCustomerTotalFare).setAmountPaidByCash(bookingAmountPaidByCash).build(); + TestMessage testMessage = TestMessage.newBuilder().setOrderNumber("test-order").setOrderDetails("ORDER-DETAILS").build(); + this.message = new Message(testKey.toByteArray(), testMessage.toByteArray(), "test", 1, 11); + this.bookingMessage = new Message(testKey.toByteArray(), testBookingLogMessage.toByteArray(), "test", 1, 11); + stencilClient = new ClassLoadStencilClient(); + testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); + bookingMessageProtoParser = stencilClient.getParser(TestBookingLogMessage.class.getCanonicalName()); + testKeyProtoParser = stencilClient.getParser(TestKey.class.getCanonicalName()); + } + + private void setRedisSinkConfig(String parserMode, String collectionKeyTemplate, RedisSinkDataType redisSinkDataType) { + when(redisSinkConfig.getKafkaRecordParserMode()).thenReturn(parserMode); + when(redisSinkConfig.getSinkRedisKeyTemplate()).thenReturn(collectionKeyTemplate); + } + + @Test + public void shouldParseStringMessageForCollectionKeyTemplate() { + setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForTestMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("3", "details")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForTestMessage, testMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(message).get(0); + + assertEquals("ORDER-DETAILS", redisHashSetFieldEntry.getValue()); + assertEquals("details", redisHashSetFieldEntry.getField()); + assertEquals("Test-test-order", redisHashSetFieldEntry.getKey()); + } + + @Test + public void shouldParseStringMessageWithSpacesForCollectionKeyTemplate() { + setRedisSinkConfig("message", "Test-%s, 1", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForTestMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("3", "details")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForTestMessage, testMessageProtoParser, redisSinkConfig, statsDReporter); + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(message).get(0); + + assertEquals("ORDER-DETAILS", redisHashSetFieldEntry.getValue()); + assertEquals("details", redisHashSetFieldEntry.getField()); + assertEquals("Test-test-order", redisHashSetFieldEntry.getKey()); + } + + + @Test + public void shouldParseFloatMessageForCollectionKeyTemplate() { + setRedisSinkConfig("message", "Test-%.2f,16", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + + assertEquals("Test-12.30", redisHashSetFieldEntry.getKey()); + assertEquals("order_number_1", redisHashSetFieldEntry.getField()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldParseLongMessageForCollectionKeyTemplate() { + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + + assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); + assertEquals("order_number_1", redisHashSetFieldEntry.getField()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldThrowExceptionForInvalidPatternInCollectionKeyTemplate() { + expectedException.expect(UnknownFormatConversionException.class); + expectedException.expectMessage("Conversion = '%'"); + + setRedisSinkConfig("message", "Test-%,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(bookingMessage); + } + + @Test + public void shouldThrowExceptionForIncompatiblePatternInCollectionKeyTemplate() { + expectedException.expect(IllegalFormatConversionException.class); + expectedException.expectMessage("f != java.lang.Long"); + + setRedisSinkConfig("message", "Test-%f,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(bookingMessage); + } + + @Test + public void shouldThrowExceptionForNonExistingDescriptorInCollectionKeyTemplate() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Descriptor not found for index: 20000"); + + setRedisSinkConfig("message", "Test-%f,20000", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(bookingMessage); + } + + @Test + public void shouldThrowExceptionForNullCollectionKeyTemplate() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Template 'null' is invalid"); + + setRedisSinkConfig("message", null, RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(bookingMessage); + } + + @Test + public void shouldThrowExceptionForEmptyCollectionKeyTemplate() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Template '' is invalid"); + + setRedisSinkConfig("message", "", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(bookingMessage); + } + + @Test + public void shouldAcceptStringForCollectionKey() { + setRedisSinkConfig("message", "Test", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + assertEquals("Test", redisHashSetFieldEntry.getKey()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldAcceptStringWithPatternForCollectionKeyWithEmptyVariables() { + setRedisSinkConfig("message", "Test-%s", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); + + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + assertEquals("Test-%s", redisHashSetFieldEntry.getKey()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldParseLongMessageForKey() { + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_%d,52")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + + assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); + assertEquals(String.format("order_number_%s", bookingCustomerTotalFare), redisHashSetFieldEntry.getField()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldParseLongMessageWithSpaceForKey() { + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_%d, 52")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + + assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); + assertEquals(String.format("order_number_%s", bookingCustomerTotalFare), redisHashSetFieldEntry.getField()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldParseStringMessageForKey() { + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_%s,2")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + + assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); + assertEquals(String.format("order_number_%s", bookingOrderNumber), redisHashSetFieldEntry.getField()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldHandleStaticStringForKey() { + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); + assertEquals("order_number", redisHashSetFieldEntry.getField()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldHandleStaticStringWithPatternForKey() { + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number%s")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); + assertEquals("order_number%s", redisHashSetFieldEntry.getField()); + assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); + } + + @Test + public void shouldThrowErrorForInvalidFormatForKey() { + expectedException.expect(UnknownFormatConversionException.class); + expectedException.expectMessage("Conversion = '%"); + + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number-%,52")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(bookingMessage); + } + + @Test + public void shouldThrowErrorForIncompatibleFormatForKey() { + expectedException.expect(IllegalFormatConversionException.class); + expectedException.expectMessage("d != java.lang.String"); + + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number-%d,2")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(message).get(0); + + assertEquals("ORDER-DETAILS", redisHashSetFieldEntry.getValue()); + assertEquals("details", redisHashSetFieldEntry.getField()); + assertEquals("Test-test-order", redisHashSetFieldEntry.getKey()); + } + + @Test + public void shouldThrowExceptionForEmptyKey() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Template '' is invalid"); + + setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(bookingMessage); + } + + @Test + public void shouldParseKeyWhenKafkaMessageParseModeSetToKey() { + setRedisSinkConfig("key", "Test-%s,1", RedisSinkDataType.HASHSET); + ProtoToFieldMapper protoToFieldMapperForKey = new ProtoToFieldMapper(testKeyProtoParser, getProperties("1", "order")); + + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForKey, testKeyProtoParser, redisSinkConfig, statsDReporter); + RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); + + assertEquals(redisHashSetFieldEntry.getValue(), "ORDER-1-FROM-KEY"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowInvalidProtocolBufferExceptionWhenIncorrectProtocolUsed() { + setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.HASHSET); + Parser protoParserForTest = stencilClient.getParser(TestNestedRepeatedMessage.class.getCanonicalName()); + ProtoToFieldMapper protoToFieldMapperForTest = new ProtoToFieldMapper(protoParserForTest, getProperties("3", "details")); + RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForTest, protoParserForTest, redisSinkConfig, statsDReporter); + + redisMessageParser.parse(message); + } + + private Properties getProperties(String s, String order) { + Properties propertiesForKey = new Properties(); + propertiesForKey.setProperty(s, order); + return propertiesForKey; + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParserTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParserTest.java new file mode 100644 index 000000000..3a414b2d2 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParserTest.java @@ -0,0 +1,97 @@ +package io.odpf.firehose.sink.redis.parsers; + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.consumer.TestKey; +import io.odpf.firehose.consumer.TestMessage; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; +import io.odpf.firehose.sink.redis.dataentry.RedisKeyValueEntry; +import io.odpf.stencil.Parser; +import io.odpf.stencil.client.ClassLoadStencilClient; +import io.odpf.stencil.client.StencilClient; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonMap; +import static org.aeonbits.owner.ConfigFactory.create; +import static org.junit.Assert.assertEquals; + +public class RedisKeyValueParserTest { + + private final byte[] testKeyByteArr = TestKey.newBuilder() + .setOrderNumber("order-2") + .setOrderUrl("order-url-world") + .build() + .toByteArray(); + private StatsDReporter statsDReporter; + private StencilClient stencilClient = new ClassLoadStencilClient(); + private Parser testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); + private Parser testKeyProtoParser = stencilClient.getParser(TestKey.class.getCanonicalName()); + + @Test + public void parse() { + Map config = new HashMap() {{ + put("KAFKA_RECORD_PARSER_MODE", "message"); + put("SINK_REDIS_KEY_TEMPLATE", "hello_world_%%s,1"); + put("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX", "3"); + }}; + RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config); + RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testMessageProtoParser, redisSinkConfig, statsDReporter); + byte[] logMessage = TestMessage.newBuilder() + .setOrderNumber("xyz-order") + .setOrderDetails("new-eureka-order") + .build() + .toByteArray(); + Message message = new Message(null, logMessage, "test-topic", 1, 100); + List redisDataEntries = redisKeyValueParser.parse(message); + + RedisKeyValueEntry expectedEntry = new RedisKeyValueEntry("hello_world_xyz-order", "new-eureka-order", null); + assertEquals(asList(expectedEntry), redisDataEntries); + + } + + @Test + public void shouldParseWhenUsingModeKey() { + Map config = new HashMap() {{ + put("KAFKA_RECORD_PARSER_MODE", "key"); + put("SINK_REDIS_KEY_TEMPLATE", "hello_world_%%s,1"); + put("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX", "2"); + }}; + RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config); + + RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null); + Message message = new Message(testKeyByteArr, null, null, 0, 0L); + redisKeyValueParser.parse(message); + } + + @Test + public void shouldThrowExceptionWhenKeyTemplateIsEmpty() { + + Message message = new Message(testKeyByteArr, testKeyByteArr, "", 0, 0); + RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, singletonMap("SINK_REDIS_KEY_TEMPLATE", "")); + RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null); + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows(IllegalArgumentException.class, () -> redisKeyValueParser.parse(message)); + assertEquals("Template '' is invalid", illegalArgumentException.getMessage()); + } + + @Test + public void shouldThrowExceptionForNoListProtoIndex() { + HashMap config = new HashMap() {{ + put("SINK_REDIS_KEY_TEMPLATE", "hello_world%%s,1"); + }}; + RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config); + + Message message = new Message(testKeyByteArr, testKeyByteArr, "", 0, 0); + RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null); + IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, + () -> redisKeyValueParser.parse(message)); + assertEquals("Please provide SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX in key value sink", illegalArgumentException.getMessage()); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisListParserTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisListParserTest.java new file mode 100644 index 000000000..c2856efa2 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisListParserTest.java @@ -0,0 +1,107 @@ +package io.odpf.firehose.sink.redis.parsers; + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.config.enums.RedisSinkDataType; +import io.odpf.firehose.message.Message; +import io.odpf.firehose.consumer.TestBookingLogMessage; +import io.odpf.firehose.consumer.TestKey; +import io.odpf.firehose.consumer.TestMessage; +import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; +import io.odpf.stencil.client.ClassLoadStencilClient; +import io.odpf.stencil.Parser; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static junit.framework.TestCase.assertEquals; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RedisListParserTest { + private final long bookingCustomerTotalFare = 2000L; + private final float bookingAmountPaidByCash = 12.3F; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Mock + private RedisSinkConfig redisSinkConfig; + + @Mock + private StatsDReporter statsDReporter; + + private Message message; + private Parser testKeyProtoParser; + private Parser testMessageProtoParser; + private ClassLoadStencilClient stencilClient; + private Message bookingMessage; + private Parser bookingMessageProtoParser; + private String bookingOrderNumber = "booking-order-1"; + + @Before + public void setUp() throws Exception { + + TestKey testKey = TestKey.newBuilder().setOrderNumber("ORDER-1-FROM-KEY").build(); + TestBookingLogMessage testBookingLogMessage = TestBookingLogMessage.newBuilder().setOrderNumber(bookingOrderNumber).setCustomerTotalFareWithoutSurge(bookingCustomerTotalFare).setAmountPaidByCash(bookingAmountPaidByCash).build(); + TestMessage testMessage = TestMessage.newBuilder().setOrderNumber("test-order").setOrderDetails("ORDER-DETAILS").build(); + this.message = new Message(testKey.toByteArray(), testMessage.toByteArray(), "test", 1, 11); + this.bookingMessage = new Message(testKey.toByteArray(), testBookingLogMessage.toByteArray(), "test", 1, 11); + stencilClient = new ClassLoadStencilClient(); + testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); + bookingMessageProtoParser = stencilClient.getParser(TestBookingLogMessage.class.getCanonicalName()); + testKeyProtoParser = stencilClient.getParser(TestKey.class.getCanonicalName()); + } + + private void setRedisSinkConfig(String parserMode, String collectionKeyTemplate, RedisSinkDataType redisSinkDataType) { + when(redisSinkConfig.getKafkaRecordParserMode()).thenReturn(parserMode); + when(redisSinkConfig.getSinkRedisKeyTemplate()).thenReturn(collectionKeyTemplate); + when(redisSinkConfig.getSinkRedisListDataProtoIndex()).thenReturn("1"); + } + + @Test + public void shouldParseStringMessageForCollectionKeyTemplateInList() { + setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.LIST); + RedisParser redisParser = new RedisListParser(testMessageProtoParser, redisSinkConfig, statsDReporter); + + RedisListEntry redisListEntry = (RedisListEntry) redisParser.parse(message).get(0); + + assertEquals("test-order", redisListEntry.getValue()); + assertEquals("Test-test-order", redisListEntry.getKey()); + } + + @Test + public void shouldParseKeyWhenKafkaMessageParseModeSetToKey() { + setRedisSinkConfig("key", "Test-%s,1", RedisSinkDataType.LIST); + + RedisParser redisParser = new RedisListParser(testKeyProtoParser, redisSinkConfig, statsDReporter); + RedisListEntry redisListEntry = (RedisListEntry) redisParser.parse(bookingMessage).get(0); + + assertEquals(redisListEntry.getValue(), "ORDER-1-FROM-KEY"); + } + + @Test + public void shouldThrowExceptionForEmptyKey() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Template '' is invalid"); + + setRedisSinkConfig("message", "", RedisSinkDataType.LIST); + RedisParser redisParser = new RedisListParser(bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisParser.parse(bookingMessage); + } + + @Test + public void shouldThrowExceptionForNoListProtoIndex() { + setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.LIST); + when(redisSinkConfig.getSinkRedisListDataProtoIndex()).thenReturn(null); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Please provide SINK_REDIS_LIST_DATA_PROTO_INDEX in list sink"); + + RedisParser redisParser = new RedisListParser(bookingMessageProtoParser, redisSinkConfig, statsDReporter); + + redisParser.parse(bookingMessage); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactoryTest.java new file mode 100644 index 000000000..0fc00930b --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactoryTest.java @@ -0,0 +1,84 @@ +package io.odpf.firehose.sink.redis.parsers; + + +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.config.enums.RedisSinkDataType; +import io.odpf.firehose.consumer.TestMessage; +import io.odpf.firehose.proto.ProtoToFieldMapper; +import io.odpf.stencil.Parser; +import io.odpf.stencil.client.ClassLoadStencilClient; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + + +@RunWith(MockitoJUnitRunner.class) +public class RedisParserFactoryTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Mock + private RedisSinkConfig redisSinkConfig; + + @Mock + private StatsDReporter statsDReporter; + + private ClassLoadStencilClient stencilClient; + private ProtoToFieldMapper protoToFieldMapper; + private Parser testMessageProtoParser; + + + @Before + public void setUp() throws Exception { + stencilClient = new ClassLoadStencilClient(); + testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); + protoToFieldMapper = new ProtoToFieldMapper(testMessageProtoParser, getProperties("3", "details")); + } + + private void setRedisSinkConfig(RedisSinkDataType redisSinkDataType) { + when(redisSinkConfig.getSinkRedisDataType()).thenReturn(redisSinkDataType); + } + + @Test + public void shouldReturnNewRedisListParser() { + setRedisSinkConfig(RedisSinkDataType.LIST); + + RedisParser parser = RedisParserFactory.getParser(protoToFieldMapper, testMessageProtoParser, redisSinkConfig, statsDReporter); + + assertEquals(RedisListParser.class, parser.getClass()); + } + + @Test + public void shouldReturnNewRedisHashSetParser() { + setRedisSinkConfig(RedisSinkDataType.HASHSET); + + RedisParser parser = RedisParserFactory.getParser(protoToFieldMapper, testMessageProtoParser, redisSinkConfig, statsDReporter); + + assertEquals(RedisHashSetParser.class, parser.getClass()); + } + + @Test + public void shouldReturnNewRedisKeyValueParser() { + setRedisSinkConfig(RedisSinkDataType.KEYVALUE); + + RedisParser parser = RedisParserFactory.getParser(protoToFieldMapper, testMessageProtoParser, redisSinkConfig, statsDReporter); + + assertEquals(RedisKeyValueParser.class, parser.getClass()); + } + + private Properties getProperties(String s, String order) { + Properties propertiesForKey = new Properties(); + propertiesForKey.setProperty(s, order); + return propertiesForKey; + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/ttl/DurationTTLTest.java b/src/test/java/io/odpf/firehose/sink/redis/ttl/DurationTTLTest.java new file mode 100644 index 000000000..2ff62c958 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/ttl/DurationTTLTest.java @@ -0,0 +1,41 @@ +package io.odpf.firehose.sink.redis.ttl; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class DurationTTLTest { + + private DurationTtl durationTTL; + + @Mock + private Pipeline pipeline; + + @Mock + private JedisCluster jedisCluster; + + @Before + public void setup() { + durationTTL = new DurationTtl(10); + } + + @Test + public void shouldSetTTLInSecondsForPipeline() { + durationTTL.setTtl(pipeline, "test-key"); + verify(pipeline, times(1)).expire("test-key", 10); + } + + @Test + public void shouldSetTTLInSecondsForCluster() { + durationTTL.setTtl(jedisCluster, "test-key"); + verify(jedisCluster, times(1)).expire("test-key", 10); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTTLTest.java b/src/test/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTTLTest.java new file mode 100644 index 000000000..ba2475cd8 --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTTLTest.java @@ -0,0 +1,39 @@ +package io.odpf.firehose.sink.redis.ttl; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +public class ExactTimeTTLTest { + + private ExactTimeTtl exactTimeTTL; + @Mock + private Pipeline pipeline; + + @Mock + private JedisCluster jedisCluster; + + @Before + public void setup() { + initMocks(this); + exactTimeTTL = new ExactTimeTtl(10000000L); + } + + @Test + public void shouldSetUnixTimeStampAsTTLForPipeline() { + exactTimeTTL.setTtl(pipeline, "test-key"); + verify(pipeline, times(1)).expireAt("test-key", 10000000L); + } + + @Test + public void shouldSetUnixTimeStampAsTTLForCluster() { + exactTimeTTL.setTtl(jedisCluster, "test-key"); + verify(jedisCluster, times(1)).expireAt("test-key", 10000000L); + } +} diff --git a/src/test/java/io/odpf/firehose/sink/redis/ttl/RedisTtlFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/ttl/RedisTtlFactoryTest.java new file mode 100644 index 000000000..2b316cc9f --- /dev/null +++ b/src/test/java/io/odpf/firehose/sink/redis/ttl/RedisTtlFactoryTest.java @@ -0,0 +1,61 @@ +package io.odpf.firehose.sink.redis.ttl; + +import io.odpf.firehose.config.RedisSinkConfig; +import io.odpf.firehose.config.enums.RedisSinkTtlType; +import io.odpf.firehose.exception.ConfigurationException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class RedisTtlFactoryTest { + + @Mock + private RedisSinkConfig redisSinkConfig; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() { + initMocks(this); + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DISABLE); + } + + @Test + public void shouldReturnNoTTLIfNothingGiven() { + RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); + Assert.assertEquals(redisTTL.getClass(), NoRedisTtl.class); + } + + @Test + public void shouldReturnExactTimeTTL() { + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.EXACT_TIME); + when(redisSinkConfig.getSinkRedisTtlValue()).thenReturn(100L); + RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); + Assert.assertEquals(redisTTL.getClass(), ExactTimeTtl.class); + } + + @Test + public void shouldReturnDurationTTL() { + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisTtlValue()).thenReturn(100L); + RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); + Assert.assertEquals(redisTTL.getClass(), DurationTtl.class); + } + + @Test + public void shouldThrowExceptionInCaseOfInvalidConfiguration() { + expectedException.expect(ConfigurationException.class); + expectedException.expectMessage("Provide a positive TTL value"); + + when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); + when(redisSinkConfig.getSinkRedisTtlValue()).thenReturn(-1L); + RedisTTLFactory.getTTl(redisSinkConfig); + } +}