Skip to content

Commit

Permalink
test: add test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Jul 17, 2019
1 parent 22bffea commit 008744c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public void deleteTopics(final Collection<String> topicsToDelete) {
+ "the Kafka cluster configuration.");
}

LOG.error(String.format("Could not delete topic '%s'", entry.getKey()), e);
failList.add(entry.getKey());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -239,9 +240,6 @@ public void shouldListTopicNames() {

@Test
public void shouldDeleteTopics() {
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.emptyList()));
expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
Expand All @@ -264,9 +262,6 @@ public void shouldReturnIfDeleteTopicsIsEmpty() {

@Test
public void shouldDeleteInternalTopics() {
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.emptyList()));
expect(adminClient.listTopics()).andReturn(getListTopicsResultWithInternalTopics());
expect(adminClient.deleteTopics(Arrays.asList(internalTopic2, internalTopic1)))
.andReturn(getDeleteInternalTopicsResult());
Expand All @@ -280,47 +275,14 @@ public void shouldDeleteInternalTopics() {
}

@Test
public void shouldDeleteTopicsIfDeleteTopicEnableTrue() {
// Given:
givenDeleteTopicEnableTrue();
expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2));

// Then:
verify(adminClient);
}

@Test
public void shouldDeleteTopicsIfBrokerDoesNotReturnValueForDeleteTopicEnable() {
// Given:
givenDeleteTopicEnableNotReturnedByBroker();
expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2));

// Then:
verify(adminClient);
}

@Test
public void shouldNotDeleteTopicIfDeleteTopicEnableFalse() {
public void shouldDeleteTopicThrowOnTopicDeletionDisabledException() {
// Given:
givenDeleteTopicEnableFalse();
expect(adminClient.deleteTopics(anyObject())).andReturn(getTopicDeletionDisableException());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2));

// Then:
verify(adminClient);
}

@Test
Expand Down Expand Up @@ -597,29 +559,23 @@ private Collection<ConfigResource> describeBrokerRequest() {
return Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, node.idString()));
}

private void givenDeleteTopicEnableTrue() {
reset(adminClient);
expect(adminClient.describeCluster()).andReturn(describeClusterResult());

final ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true");
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.singletonList(configEntryDeleteEnable)));
}
private DeleteTopicsResult getTopicDeletionDisableException() {
final DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class);
final KafkaFuture<Void> kafkaFuture = mock(KafkaFuture.class);

private void givenDeleteTopicEnableFalse() {
reset(adminClient);
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
try {
expect(kafkaFuture.get()).andThrow(
new TopicDeletionDisabledException("Topic deletion is disabled")
);
} catch (final Exception e) {
// this should not happen in the test
}

final ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "false");
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.singletonList(configEntryDeleteEnable)));
}
expect(deleteTopicsResult.values())
.andReturn(Collections.singletonMap(topicName1, kafkaFuture));

private void givenDeleteTopicEnableNotReturnedByBroker() {
reset(adminClient);
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.emptyList()));
replay(deleteTopicsResult);
return deleteTopicsResult;
}

private DescribeConfigsResult describeBrokerResult(final List<ConfigEntry> brokerConfigs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -374,6 +376,21 @@ public void shouldThrowIfCannotDeleteManagedTopic() {

}

@Test
public void shouldNotThrowOnTopicDeletionDisabledException() throws Exception {
// Given:
givenTopicsExistInKafka("K_Foo");
givenSinkTopicsExistInMetastore("K_Foo");
givenSchemasForTopicsExistInSchemaRegistry("K_Foo");
doThrow(TopicDeletionDisabledException.class).when(kafkaTopicClient).deleteTopics(any());

// When:
clusterTerminator.terminateCluster(ImmutableList.of("K_Foo"));

// Then:
verifySchemaNotDeletedForTopic("K_Foo");
}

@Test
public void shouldNotThrowIfCannotCleanUpSchema() throws Exception {
// Given:
Expand Down

0 comments on commit 008744c

Please sign in to comment.