Skip to content

Commit

Permalink
feat: add config for kafka consumer partition assignment strategy (#209)
Browse files Browse the repository at this point in the history
* feat: add config for kafka consumer partition assignment strategy

* chore: version bump

* chore: bump version to 0.7.1

* chore: version bump
  • Loading branch information
sumitaich1998 authored Jan 18, 2023
1 parent 01f688d commit f95c194
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'io.odpf'
version '0.7.1'
version '0.7.2'

def projName = "firehose"

Expand Down
Binary file modified docs/docs/advance/generic.md
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public interface KafkaConsumerConfig extends AppConfig {
@Key("SOURCE_KAFKA_CONSUMER_CONFIG_MANUAL_COMMIT_MIN_INTERVAL_MS")
@DefaultValue("-1")
long getSourceKafkaConsumerManualCommitMinIntervalMs();

@Key("SOURCE_KAFKA_CONSUMER_CONFIG_PARTITION_ASSIGNMENT_STRATEGY")
@DefaultValue("org.apache.kafka.clients.consumer.CooperativeStickyAssignor")
String getSourceKafkaConsumerConfigPartitionAssignmentStrategy();

}
3 changes: 3 additions & 0 deletions src/main/java/io/odpf/firehose/utils/KafkaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class KafkaUtils {
private static final String METADATA_MAX_AGE_MS = "metadata.max.age.ms";
private static final String MAX_POLL_RECORDS = "max.poll.records";
private static final String SESSION_TIMEOUT_MS = "session.timeout.ms";
private static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";



/**
Expand All @@ -56,6 +58,7 @@ public static Map<String, Object> getConfig(KafkaConsumerConfig config, Map<Stri
put(METADATA_MAX_AGE_MS, config.getSourceKafkaConsumerConfigMetadataMaxAgeMs());
put(MAX_POLL_RECORDS, config.getSourceKafkaConsumerConfigMaxPollRecords());
put(SESSION_TIMEOUT_MS, config.getSourceKafkaConsumerConfigSessionTimeoutMs());
put(PARTITION_ASSIGNMENT_STRATEGY, config.getSourceKafkaConsumerConfigPartitionAssignmentStrategy());
}};

return merge(consumerConfigurationMap, KafkaEnvironmentVariables.parse(extraParameters));
Expand Down

0 comments on commit f95c194

Please sign in to comment.