Skip to content

Commit

Permalink
fix: show topics doesn't display topics with different casing (#4159)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Dec 17, 2019
1 parent 2f41aac commit 0ac8747
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private static Map<String, TopicDescription> filterKsqlInternalTopics(
for (final Map.Entry<String, TopicDescription> entry : kafkaTopicDescriptions.entrySet()) {
if (!entry.getKey().startsWith(serviceId + persistentQueryPrefix)
&& !entry.getKey().startsWith(serviceId + transientQueryPrefix)) {
filteredKafkaTopics.put(entry.getKey().toLowerCase(), entry.getValue());
filteredKafkaTopics.put(entry.getKey(), entry.getValue());
}
}
return filteredKafkaTopics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,60 @@
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class ListTopicsExecutorTest {

@Rule public final TemporaryEngine engine = new TemporaryEngine();
@Mock
private AdminClient adminClient;

private ServiceContext serviceContext;

@Before
public void setUp() {
serviceContext = TestServiceContext.create(
engine.getServiceContext().getKafkaClientSupplier(),
adminClient,
engine.getServiceContext().getTopicClient(),
engine.getServiceContext().getSchemaRegistryClientFactory(),
engine.getServiceContext().getConnectClient()
);
}

@Test
public void shouldListKafkaTopics() {
// Given:
engine.givenKafkaTopic("topic1");
engine.givenKafkaTopic("topic2");

final AdminClient mockAdminClient = mock(AdminClient.class);
// When:
final KafkaTopicsList topicsList =
(KafkaTopicsList) CustomExecutors.LIST_TOPICS.execute(
engine.configure("LIST TOPICS;"),
ImmutableMap.of(),
engine.getEngine(),
serviceContext
).orElseThrow(IllegalStateException::new);

final ServiceContext serviceContext = TestServiceContext.create(
engine.getServiceContext().getKafkaClientSupplier(),
mockAdminClient,
engine.getServiceContext().getTopicClient(),
engine.getServiceContext().getSchemaRegistryClientFactory(),
engine.getServiceContext().getConnectClient()
);
// Then:
assertThat(topicsList.getTopics(), containsInAnyOrder(
new KafkaTopicInfo("topic1", ImmutableList.of(1)),
new KafkaTopicInfo("topic2", ImmutableList.of(1))
));
}

@Test
public void shouldListKafkaTopicsThatDifferByCase() {
// Given:
engine.givenKafkaTopic("topic1");
engine.givenKafkaTopic("toPIc1");

// When:
final KafkaTopicsList topicsList =
Expand All @@ -72,7 +101,7 @@ public void shouldListKafkaTopics() {
// Then:
assertThat(topicsList.getTopics(), containsInAnyOrder(
new KafkaTopicInfo("topic1", ImmutableList.of(1)),
new KafkaTopicInfo("topic2", ImmutableList.of(1))
new KafkaTopicInfo("toPIc1", ImmutableList.of(1))
));
}

Expand All @@ -82,22 +111,13 @@ public void shouldListKafkaTopicsExtended() {
engine.givenKafkaTopic("topic1");
engine.givenKafkaTopic("topic2");

final AdminClient mockAdminClient = mock(AdminClient.class);
final ListConsumerGroupsResult result = mock(ListConsumerGroupsResult.class);
final KafkaFutureImpl<Collection<ConsumerGroupListing>> groups = new KafkaFutureImpl<>();

when(result.all()).thenReturn(groups);
when(mockAdminClient.listConsumerGroups()).thenReturn(result);
when(adminClient.listConsumerGroups()).thenReturn(result);
groups.complete(ImmutableList.of());

final ServiceContext serviceContext = TestServiceContext.create(
engine.getServiceContext().getKafkaClientSupplier(),
mockAdminClient,
engine.getServiceContext().getTopicClient(),
engine.getServiceContext().getSchemaRegistryClientFactory(),
engine.getServiceContext().getConnectClient()
);

// When:
final KafkaTopicsListExtended topicsList =
(KafkaTopicsListExtended) CustomExecutors.LIST_TOPICS.execute(
Expand All @@ -113,5 +133,4 @@ public void shouldListKafkaTopicsExtended() {
new KafkaTopicInfoExtended("topic2", ImmutableList.of(1), 0, 0)
));
}

}

0 comments on commit 0ac8747

Please sign in to comment.