diff --git a/build.gradle b/build.gradle index 733e6e33c..09f4d0d81 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'io.odpf' -version '0.7.1' +version '0.7.2' def projName = "firehose" diff --git a/docs/docs/advance/generic.md b/docs/docs/advance/generic.md index 308e33913..b18238575 100644 Binary files a/docs/docs/advance/generic.md and b/docs/docs/advance/generic.md differ diff --git a/src/main/java/io/odpf/firehose/config/KafkaConsumerConfig.java b/src/main/java/io/odpf/firehose/config/KafkaConsumerConfig.java index 4b8ef66f0..819d96c88 100644 --- a/src/main/java/io/odpf/firehose/config/KafkaConsumerConfig.java +++ b/src/main/java/io/odpf/firehose/config/KafkaConsumerConfig.java @@ -52,4 +52,9 @@ public interface KafkaConsumerConfig extends AppConfig { @Key("SOURCE_KAFKA_CONSUMER_CONFIG_MANUAL_COMMIT_MIN_INTERVAL_MS") @DefaultValue("-1") long getSourceKafkaConsumerManualCommitMinIntervalMs(); + + @Key("SOURCE_KAFKA_CONSUMER_CONFIG_PARTITION_ASSIGNMENT_STRATEGY") + @DefaultValue("org.apache.kafka.clients.consumer.CooperativeStickyAssignor") + String getSourceKafkaConsumerConfigPartitionAssignmentStrategy(); + } diff --git a/src/main/java/io/odpf/firehose/utils/KafkaUtils.java b/src/main/java/io/odpf/firehose/utils/KafkaUtils.java index 4654e5751..c0043d1bc 100644 --- a/src/main/java/io/odpf/firehose/utils/KafkaUtils.java +++ b/src/main/java/io/odpf/firehose/utils/KafkaUtils.java @@ -30,6 +30,8 @@ public class KafkaUtils { private static final String METADATA_MAX_AGE_MS = "metadata.max.age.ms"; private static final String MAX_POLL_RECORDS = "max.poll.records"; private static final String SESSION_TIMEOUT_MS = "session.timeout.ms"; + private static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy"; + /** @@ -56,6 +58,7 @@ public static Map getConfig(KafkaConsumerConfig config, Map