Skip to content

Commit

Permalink
Add test to verify that cleanup.policy is set to compact in the Kafka…
Browse files Browse the repository at this point in the history
…TopicClient (#1071)
  • Loading branch information
apurvam authored Mar 30, 2018
1 parent bd8ea1e commit e6df77d
Showing 1 changed file with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -283,6 +285,26 @@ public void shouldHandleRetriableGetTopicConfigError() {
assertThat(config.get(TopicConfig.COMPRESSION_TYPE_CONFIG), is("producer"));
}


@Test
public void shouldSetTopicCleanupPolicyToCompact() throws InterruptedException,
ExecutionException {
expect(adminClient.listTopics()).andReturn(getEmptyListTopicResult());

// Verify that the new topic configuration being passed to the admin client is what we expect.
NewTopic newTopic = new NewTopic(topicName1, 1, (short) 1);
newTopic.configs(Collections.singletonMap("cleanup.policy", "compact"));
expect(adminClient.createTopics(singleNewTopic(newTopic))).andReturn(getCreateTopicsResult());
replay(adminClient);

KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
kafkaTopicClient.createTopic(topicName1,
1,
(short) 1,
Collections.singletonMap("cleanup.policy", "compact"));
verify(adminClient);
}

@Test
public void shouldAddTopicConfig() {
final Map<String, ?> overrides = ImmutableMap.of(
Expand Down Expand Up @@ -581,4 +603,31 @@ public void appendTo(final StringBuffer buffer) {
EasyMock.reportMatcher(new ConfigMatcher());
return null;
}

private static Collection<NewTopic> singleNewTopic(final NewTopic expected) {
class NewTopicsMatcher implements IArgumentMatcher {
@SuppressWarnings("unchecked")
@Override
public boolean matches(final Object argument) {
final Collection<NewTopic> newTopics = (Collection<NewTopic>) argument;
if (newTopics.size() != 1) {
return false;
}

final NewTopic actual = newTopics.iterator().next();
return Objects.equals(actual.name(), expected.name())
&& Objects.equals(actual.replicationFactor(), expected.replicationFactor())
&& Objects.equals(actual.numPartitions(), expected.numPartitions())
&& Objects.equals(actual.configs(), expected.configs());
}

@Override
public void appendTo(final StringBuffer buffer) {
buffer.append("{NewTopic").append(expected).append("}");
}
}

EasyMock.reportMatcher(new NewTopicsMatcher());
return null;
}
}

0 comments on commit e6df77d

Please sign in to comment.