Skip to content

Commit

Permalink
fix: npe when getting topic configs (#6946)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Feb 4, 2021
1 parent 96288e4 commit 5e026d4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ private Map<String, String> topicConfig(
() -> adminClient.get().describeConfigs(request).all().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE).get(resource);
return config.entries().stream()
.filter(e -> e.value() != null)
.filter(e -> includeDefaults || !e.isDefault())
.collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,23 @@ public void shouldGetTopicConfig() {
assertThat(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is("1"));
}

@Test
public void shouldNotFailWhenTopicConfigValueIsNull() {
// Given:
givenTopicConfigs(
"fred",
overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
overriddenConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, null)
);

// When:
final Map<String, String> config = kafkaTopicClient.getTopicConfig("fred");

// Then:
assertThat(config.get(TopicConfig.RETENTION_MS_CONFIG), is("12345"));
assertThat(config.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is(false));
}

@Test
public void shouldGetTopicCleanUpPolicyDelete() {
// Given:
Expand Down

0 comments on commit 5e026d4

Please sign in to comment.