Skip to content

Commit

Permalink
feature:add key value support for redis sink (#151)
Browse files Browse the repository at this point in the history
* feature:add key value support for redis sink

* refactor: implement code review comments

Co-authored-by: kevin.bheda <[email protected]>
  • Loading branch information
kevinbheda and kevinbhedag authored Feb 1, 2022
1 parent fef57ca commit f020f14
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 14 deletions.
10 changes: 10 additions & 0 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,16 @@ This field decides what all data will be stored in the List for each message.

* Type: `required (For List)`

### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX`

This field decides what data will be stored in the value part of key-value pair

* Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\

* Type: `required (For KeyValue)`

### `SINK_REDIS_TTL_TYPE`

* Example value: `DURATION`
Expand Down
10 changes: 10 additions & 0 deletions docs/reference/configuration/redis-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ This field decides what all data will be stored in the List for each message.

* Type: `required (For List)`

### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX`

This field decides what data will be stored in the value part of key-value pair

* Example value: `6`

This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\

* Type: `required (For KeyValue)`

## `SINK_REDIS_TTL_TYPE`

* Example value: `DURATION`
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/odpf/firehose/config/RedisSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public interface RedisSinkConfig extends AppConfig {
@Key("SINK_REDIS_LIST_DATA_PROTO_INDEX")
String getSinkRedisListDataProtoIndex();

@Key("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX")
String getSinkRedisKeyValuetDataProtoIndex();

@Key("SINK_REDIS_TTL_TYPE")
@DefaultValue("DISABLE")
@ConverterClass(RedisSinkTtlTypeConverter.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum RedisSinkDataType {
LIST,
HASHSET
HASHSET,
KEYVALUE,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.odpf.firehose.sink.redis.dataentry;

import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.sink.redis.ttl.RedisTtl;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;

@AllArgsConstructor
@Getter
@EqualsAndHashCode
public class RedisKeyValueEntry implements RedisDataEntry {

private String key;
private String value;
@EqualsAndHashCode.Exclude private Instrumentation instrumentation;

@Override
public void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL) {
redisTTL.setTtl(jedisPipelined, key);
instrumentation.logDebug("key: {}, value: {}", key, value);
jedisPipelined.set(key, value);
}

@Override
public void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL) {
redisTTL.setTtl(jedisCluster, key);
instrumentation.logDebug("key: {}, value: {}", key, value);
jedisCluster.set(key, value);

}

@Override
public String toString() {
return "RedisKeyValueEntry{"
+
"key='"
+ key
+ '\''
+
", value='" + value
+ '\''
+
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.odpf.firehose.sink.redis.parsers;

import com.google.protobuf.DynamicMessage;
import io.odpf.firehose.config.RedisSinkConfig;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.metrics.StatsDReporter;
import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry;
import io.odpf.firehose.sink.redis.dataentry.RedisKeyValueEntry;
import io.odpf.stencil.Parser;

import java.util.Collections;
import java.util.List;

public class RedisKeyValueParser extends RedisParser {
private RedisSinkConfig redisSinkConfig;
private StatsDReporter statsDReporter;

public RedisKeyValueParser(Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) {
super(protoParser, redisSinkConfig);
this.redisSinkConfig = redisSinkConfig;
this.statsDReporter = statsDReporter;
}

@Override
public List<RedisDataEntry> parse(Message message) {
DynamicMessage parsedMessage = parseEsbMessage(message);
String redisKey = parseTemplate(parsedMessage, redisSinkConfig.getSinkRedisKeyTemplate());
String protoIndex = redisSinkConfig.getSinkRedisKeyValuetDataProtoIndex();
if (protoIndex == null) {
throw new IllegalArgumentException("Please provide SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX in key value sink");
}
Instrumentation instrumentation = new Instrumentation(statsDReporter, RedisKeyValueEntry.class);
RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(redisKey, getDataByFieldNumber(parsedMessage, protoIndex).toString(), instrumentation);
return Collections.singletonList(redisKeyValueEntry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


import io.odpf.firehose.config.RedisSinkConfig;
import io.odpf.firehose.config.enums.RedisSinkDataType;
import io.odpf.firehose.metrics.StatsDReporter;
import io.odpf.firehose.proto.ProtoToFieldMapper;
import io.odpf.stencil.Parser;
Expand All @@ -23,13 +22,15 @@ public class RedisParserFactory {
*/
public static RedisParser getParser(ProtoToFieldMapper protoToFieldMapper, Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) {

RedisParser redisParser;

if (redisSinkConfig.getSinkRedisDataType().equals(RedisSinkDataType.LIST)) {
redisParser = new RedisListParser(protoParser, redisSinkConfig, statsDReporter);
} else {
redisParser = new RedisHashSetParser(protoToFieldMapper, protoParser, redisSinkConfig, statsDReporter);
switch (redisSinkConfig.getSinkRedisDataType()) {
case LIST:
return new RedisListParser(protoParser, redisSinkConfig, statsDReporter);
case HASHSET:
return new RedisHashSetParser(protoToFieldMapper, protoParser, redisSinkConfig, statsDReporter);
case KEYVALUE:
return new RedisKeyValueParser(protoParser, redisSinkConfig, statsDReporter);
default:
return null;
}
return redisParser;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.odpf.firehose.sink.redis.dataentry;

import io.odpf.firehose.metrics.Instrumentation;
import io.odpf.firehose.sink.redis.ttl.DurationTtl;
import io.odpf.firehose.sink.redis.ttl.NoRedisTtl;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class RedisKeyValueEntryTest {
@Mock
private Instrumentation instrumentation;

@Mock
private Pipeline pipeline;

@Mock
private JedisCluster jedisCluster;

@Before
public void setup() {
MockitoAnnotations.initMocks(this);

}

@Test
public void pushMessageWithNoTtl() {
String key = "key";
String value = "value";
RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, instrumentation);
redisKeyValueEntry.pushMessage(pipeline, new NoRedisTtl());
verify(pipeline, times(1)).set(key, value);
verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class));

}

@Test
public void pushMessageWithTtl() {
String key = "key";
String value = "value";
RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, instrumentation);
redisKeyValueEntry.pushMessage(pipeline, new DurationTtl(100));
verify(pipeline, times(1)).set(key, value);
verify(pipeline, times(1)).expire(key, 100);
}

@Test
public void pushMessageVerifyInstrumentation() {
String key = "this-key";
String value = "john";
RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, instrumentation);
redisKeyValueEntry.pushMessage(pipeline, new DurationTtl(100));
verify(instrumentation, times(1)).logDebug("key: {}, value: {}", key, value);
}


@Test
public void pushMessageWithNoTtlUsingJedisCluster() {
String key = "key";
String value = "value";
RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, instrumentation);
redisKeyValueEntry.pushMessage(jedisCluster, new NoRedisTtl());
verify(jedisCluster, times(1)).set(key, value);
verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class));

}

@Test
public void pushMessageWithTtlUsingJedisCluster() {
String key = "key";
String value = "value";
RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, instrumentation);
redisKeyValueEntry.pushMessage(jedisCluster, new DurationTtl(100));
verify(jedisCluster, times(1)).set(key, value);
verify(jedisCluster, times(1)).expire(key, 100);
}

@Test
public void pushMessageVerifyInstrumentationUsingJedisCluster() {
String key = "this-key";
String value = "john";
RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, instrumentation);
redisKeyValueEntry.pushMessage(jedisCluster, new DurationTtl(100));
verify(instrumentation, times(1)).logDebug("key: {}, value: {}", key, value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.odpf.firehose.sink.redis.parsers;

import io.odpf.firehose.config.RedisSinkConfig;
import io.odpf.firehose.consumer.TestKey;
import io.odpf.firehose.consumer.TestMessage;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.StatsDReporter;
import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry;
import io.odpf.firehose.sink.redis.dataentry.RedisKeyValueEntry;
import io.odpf.stencil.Parser;
import io.odpf.stencil.client.ClassLoadStencilClient;
import io.odpf.stencil.client.StencilClient;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonMap;
import static org.aeonbits.owner.ConfigFactory.create;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

public class RedisKeyValueParserTest {

private final byte[] testKeyByteArr = TestKey.newBuilder()
.setOrderNumber("order-2")
.setOrderUrl("order-url-world")
.build()
.toByteArray();
private StatsDReporter statsDReporter;
private StencilClient stencilClient = new ClassLoadStencilClient();
private Parser testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName());
private Parser testKeyProtoParser = stencilClient.getParser(TestKey.class.getCanonicalName());

@Test
public void parse() {
Map<String, String> config = new HashMap<String, String>() {{
put("KAFKA_RECORD_PARSER_MODE", "message");
put("SINK_REDIS_KEY_TEMPLATE", "hello_world_%%s,1");
put("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX", "3");
}};
RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config);
RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testMessageProtoParser, redisSinkConfig, statsDReporter);
byte[] logMessage = TestMessage.newBuilder()
.setOrderNumber("xyz-order")
.setOrderDetails("new-eureka-order")
.build()
.toByteArray();
Message message = new Message(null, logMessage, "test-topic", 1, 100);
List<RedisDataEntry> redisDataEntries = redisKeyValueParser.parse(message);

RedisKeyValueEntry expectedEntry = new RedisKeyValueEntry("hello_world_xyz-order", "new-eureka-order", null);
assertEquals(asList(expectedEntry), redisDataEntries);

}

@Test
public void shouldParseWhenUsingModeKey() {
Map<String, String> config = new HashMap<String, String>() {{
put("KAFKA_RECORD_PARSER_MODE", "key");
put("SINK_REDIS_KEY_TEMPLATE", "hello_world_%%s,1");
put("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX", "2");
}};
RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config);

RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null);
Message message = new Message(testKeyByteArr, null, null, 0, 0L);
redisKeyValueParser.parse(message);
}

@Test
public void shouldThrowExceptionWhenKeyTemplateIsEmpty() {

Message message = new Message(testKeyByteArr, testKeyByteArr, "", 0, 0);
RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, singletonMap("SINK_REDIS_KEY_TEMPLATE", ""));
RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null);
IllegalArgumentException illegalArgumentException =
assertThrows(IllegalArgumentException.class, () -> redisKeyValueParser.parse(message));
assertEquals("Template '' is invalid", illegalArgumentException.getMessage());
}

@Test
public void shouldThrowExceptionForNoListProtoIndex() {
HashMap<String, String> config = new HashMap<String, String>() {{
put("SINK_REDIS_KEY_TEMPLATE", "hello_world%%s,1");
}};
RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config);

Message message = new Message(testKeyByteArr, testKeyByteArr, "", 0, 0);
RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null);
IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> redisKeyValueParser.parse(message));
assertEquals("Please provide SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX in key value sink", illegalArgumentException.getMessage());
}
}
Loading

0 comments on commit f020f14

Please sign in to comment.