diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 72be3d1fbdf5..e41e774ddb2c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -240,17 +240,17 @@ public boolean addTopicConfig(final String topicName, final Map overr @Override public TopicCleanupPolicy getTopicCleanupPolicy(final String topicName) { final String policy = getTopicConfig(topicName) - .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); - - switch (policy) { - case "compact": - return TopicCleanupPolicy.COMPACT; - case "delete": - return TopicCleanupPolicy.DELETE; - case "compact+delete": - return TopicCleanupPolicy.COMPACT_DELETE; - default: - throw new KsqlException("Could not get the topic configs for : " + topicName); + .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "") + .toLowerCase(); + + if (policy.equals("compact")) { + return TopicCleanupPolicy.COMPACT; + } else if (policy.equals("delete")) { + return TopicCleanupPolicy.DELETE; + } else if (policy.contains("compact") && policy.contains("delete")) { + return TopicCleanupPolicy.COMPACT_DELETE; + } else { + throw new KsqlException("Could not get the topic configs for : " + topicName); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index b6f897be5575..c1a1504c825e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -23,8 +23,8 @@ import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.CreateAsSelect; import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; -import io.confluent.ksql.parser.tree.CreateTableAsSelect; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.services.KafkaTopicClient; @@ -34,7 +34,6 @@ import io.confluent.ksql.topic.TopicProperties.Builder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -126,7 +125,10 @@ private ConfiguredStatement injectForCreateSource( properties.getPartitions(), properties.getReplicas()); - createTopic(topicPropertiesBuilder, createSource instanceof CreateTable); + final String topicCleanUpPolicy = createSource instanceof CreateTable + ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; + + createTopic(topicPropertiesBuilder, topicCleanUpPolicy); return statement; } @@ -157,10 +159,16 @@ private ConfiguredStatement injectForCreateAsSelec properties.getPartitions(), properties.getReplicas()); - final boolean shouldCompactTopic = createAsSelect instanceof CreateTableAsSelect - && !createAsSelect.getQuery().getWindow().isPresent(); + final String topicCleanUpPolicy; + if (createAsSelect instanceof CreateStreamAsSelect) { + topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE; + } else { + topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent() + ? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE + : TopicConfig.CLEANUP_POLICY_COMPACT; + } - final TopicProperties info = createTopic(topicPropertiesBuilder, shouldCompactTopic); + final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy); final T withTopic = (T) createAsSelect.copyWith(properties.withTopic( info.getTopicName(), @@ -175,13 +183,12 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, - final boolean shouldCompactTopic + final String topicCleanUpPolicy ) { final TopicProperties info = topicPropertiesBuilder.build(); - final Map config = shouldCompactTopic - ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - : Collections.emptyMap(); + final Map config = + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java index 4d7d92c7372d..c1bb3985c721 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java @@ -156,7 +156,7 @@ public void shouldAggregateTumblingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -179,7 +179,7 @@ public void shouldAggregateHoppingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -210,7 +210,7 @@ public void shouldAggregateSessionWindow() { // Then: assertOutputOf(resultStream0, expected, mapHasItems(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 2, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 2, resultStream0); } private void givenTable(final String sql) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 69e6ea1026c1..a3162722a37a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -16,6 +16,8 @@ package io.confluent.ksql.services; import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; @@ -39,6 +41,7 @@ import io.confluent.ksql.exception.KafkaResponseGetFailedException; import io.confluent.ksql.exception.KafkaTopicExistsException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; +import io.confluent.ksql.services.KafkaTopicClient.TopicCleanupPolicy; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -459,6 +462,46 @@ public void shouldGetTopicConfig() { assertThat(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is("1")); } + @Test + public void shouldGetTopicCleanUpPolicyDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.DELETE)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompact() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompactAndDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, + CLEANUP_POLICY_COMPACT + "," + CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT_DELETE)); + } + @Test public void shouldThrowOnNoneRetryableGetTopicConfigError() { // Given: @@ -517,7 +560,7 @@ public void shouldSetStringTopicConfig() { ); final Map configOverrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -528,7 +571,7 @@ public void shouldSetStringTopicConfig() { verify(adminClient).incrementalAlterConfigs(ImmutableMap.of( topicResource("peter"), ImmutableSet.of( - setConfig(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + setConfig(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) ) )); } @@ -570,7 +613,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) @@ -583,7 +626,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { verify(adminClient).alterConfigs(ImmutableMap.of( topicResource("peter"), new Config(ImmutableSet.of( - new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + new ConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1234") )) )); @@ -594,12 +637,12 @@ public void shouldNotAlterStringConfigIfMatchingConfigOverrideExists() { // Given: givenTopicConfigs( "peter", - overriddenConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy") ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -641,7 +684,7 @@ public void shouldRetryAddingTopicConfig() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index 3dc720b470eb..32c78ccb5fa2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -345,7 +345,7 @@ public void shouldCreateMissingTopic() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -362,7 +362,7 @@ public void shouldCreateMissingTopicForCreate() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -401,7 +401,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForCreateTable() { } @Test - public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() { + public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowedTables() { // Given: givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS);"); @@ -415,7 +415,8 @@ public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)); } @Test