From 9e2652e7915690400fe048995d339824a5f919e3 Mon Sep 17 00:00:00 2001 From: Aleksandr Sorokoumov Date: Fri, 17 Dec 2021 11:48:51 +0100 Subject: [PATCH] fixup! fix: Consider topics created by join operations internal --- .../io/confluent/ksql/services/KafkaTopicClientImpl.java | 4 ++-- .../ksql/services/KafkaTopicClientImplTest.java | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 339caad6b01e..cd908aa3feae 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -375,9 +375,9 @@ private Config getConfig() { private static boolean isInternalTopic(final String topicName, final String applicationId) { return topicName.startsWith(applicationId + "-") && (topicName.endsWith(KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX) - || topicName.endsWith(KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX)) + || topicName.endsWith(KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX) || topicName.matches(KsqlConstants.STREAMS_JOIN_REGISTRATION_TOPIC_PATTERN) - || topicName.matches(KsqlConstants.STREAMS_JOIN_RESPONSE_TOPIC_PATTERN); + || topicName.matches(KsqlConstants.STREAMS_JOIN_RESPONSE_TOPIC_PATTERN)); } private void validateTopicProperties( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 2d8f3131f73e..ccb3ea644dcf 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -453,12 +453,21 @@ public void shouldDeleteInternalTopics() { + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000012-topic"; final String internalTopic3 = applicationId + "-something-changelog"; final String internalTopic4 = applicationId + "-something-repartition"; + // the next 2 topics four are not prefixed with the application id, so they are not internal + final String customTopic1 = "what-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000012-topic"; + final String customTopic2 = "eva-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000012-topic"; + final String customTopic3 = "-something-changelog"; + final String customTopic4 = "-something-repartition"; givenTopicExists("topic1", 1, 1); givenTopicExists(internalTopic1, 1, 1); givenTopicExists(internalTopic2, 1, 1); givenTopicExists(internalTopic3, 1, 1); givenTopicExists(internalTopic4, 1, 1); + givenTopicExists(customTopic1, 1, 1); + givenTopicExists(customTopic2, 1, 1); + givenTopicExists(customTopic3, 1, 1); + givenTopicExists(customTopic4, 1, 1); givenTopicExists("topic2", 1, 1); // When: