From 08ff35a8bc35ee4120e7ac6bef945a6ccec8c200 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 8 Jul 2020 21:12:07 -0700 Subject: [PATCH] fix: windowed tables now have cleanup policy compact+delete (#5743) BREAKING CHANGE: ksqlDB now creates windowed tables with cleanup policy "compact,delete", rather than "compact". Also, topics that back streams are always created with cleanup policy "delete", rather than the broker default (by default, "delete"). --- .../ksql/services/KafkaTopicClientImpl.java | 22 +++---- .../ksql/topic/TopicCreateInjector.java | 27 +++++---- .../ksql/integration/WindowingIntTest.java | 6 +- .../services/KafkaTopicClientImplTest.java | 57 ++++++++++++++++--- .../ksql/topic/TopicCreateInjectorTest.java | 9 +-- 5 files changed, 86 insertions(+), 35 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 72be3d1fbdf5..e41e774ddb2c 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -240,17 +240,17 @@ public boolean addTopicConfig(final String topicName, final Map overr @Override public TopicCleanupPolicy getTopicCleanupPolicy(final String topicName) { final String policy = getTopicConfig(topicName) - .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); - - switch (policy) { - case "compact": - return TopicCleanupPolicy.COMPACT; - case "delete": - return TopicCleanupPolicy.DELETE; - case "compact+delete": - return TopicCleanupPolicy.COMPACT_DELETE; - default: - throw new KsqlException("Could not get the topic configs for : " + topicName); + .getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "") + .toLowerCase(); + + if (policy.equals("compact")) { + return TopicCleanupPolicy.COMPACT; + } else if (policy.equals("delete")) { + return TopicCleanupPolicy.DELETE; + } else if (policy.contains("compact") && policy.contains("delete")) { + return TopicCleanupPolicy.COMPACT_DELETE; + } else { + throw new KsqlException("Could not get the topic configs for : " + topicName); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index b6f897be5575..c1a1504c825e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -23,8 +23,8 @@ import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.CreateAsSelect; import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; -import io.confluent.ksql.parser.tree.CreateTableAsSelect; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.services.KafkaTopicClient; @@ -34,7 +34,6 @@ import io.confluent.ksql.topic.TopicProperties.Builder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -126,7 +125,10 @@ private ConfiguredStatement injectForCreateSource( properties.getPartitions(), properties.getReplicas()); - createTopic(topicPropertiesBuilder, createSource instanceof CreateTable); + final String topicCleanUpPolicy = createSource instanceof CreateTable + ? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE; + + createTopic(topicPropertiesBuilder, topicCleanUpPolicy); return statement; } @@ -157,10 +159,16 @@ private ConfiguredStatement injectForCreateAsSelec properties.getPartitions(), properties.getReplicas()); - final boolean shouldCompactTopic = createAsSelect instanceof CreateTableAsSelect - && !createAsSelect.getQuery().getWindow().isPresent(); + final String topicCleanUpPolicy; + if (createAsSelect instanceof CreateStreamAsSelect) { + topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE; + } else { + topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent() + ? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE + : TopicConfig.CLEANUP_POLICY_COMPACT; + } - final TopicProperties info = createTopic(topicPropertiesBuilder, shouldCompactTopic); + final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy); final T withTopic = (T) createAsSelect.copyWith(properties.withTopic( info.getTopicName(), @@ -175,13 +183,12 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, - final boolean shouldCompactTopic + final String topicCleanUpPolicy ) { final TopicProperties info = topicPropertiesBuilder.build(); - final Map config = shouldCompactTopic - ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - : Collections.emptyMap(); + final Map config = + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java index 4d7d92c7372d..c1bb3985c721 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/WindowingIntTest.java @@ -156,7 +156,7 @@ public void shouldAggregateTumblingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -179,7 +179,7 @@ public void shouldAggregateHoppingWindow() { // Then: assertOutputOf(resultStream0, expected, is(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0); } @Test @@ -210,7 +210,7 @@ public void shouldAggregateSessionWindow() { // Then: assertOutputOf(resultStream0, expected, mapHasItems(expected)); - assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 2, resultStream0); + assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 2, resultStream0); } private void givenTable(final String sql) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 69e6ea1026c1..a3162722a37a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -16,6 +16,8 @@ package io.confluent.ksql.services; import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; @@ -39,6 +41,7 @@ import io.confluent.ksql.exception.KafkaResponseGetFailedException; import io.confluent.ksql.exception.KafkaTopicExistsException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; +import io.confluent.ksql.services.KafkaTopicClient.TopicCleanupPolicy; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -459,6 +462,46 @@ public void shouldGetTopicConfig() { assertThat(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is("1")); } + @Test + public void shouldGetTopicCleanUpPolicyDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.DELETE)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompact() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT)); + } + + @Test + public void shouldGetTopicCleanUpPolicyCompactAndDelete() { + // Given: + givenTopicConfigs( + "foo", + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, + CLEANUP_POLICY_COMPACT + "," + CLEANUP_POLICY_DELETE) + ); + + // When / Then: + assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"), + is(TopicCleanupPolicy.COMPACT_DELETE)); + } + @Test public void shouldThrowOnNoneRetryableGetTopicConfigError() { // Given: @@ -517,7 +560,7 @@ public void shouldSetStringTopicConfig() { ); final Map configOverrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -528,7 +571,7 @@ public void shouldSetStringTopicConfig() { verify(adminClient).incrementalAlterConfigs(ImmutableMap.of( topicResource("peter"), ImmutableSet.of( - setConfig(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) + setConfig(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT) ) )); } @@ -570,7 +613,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) @@ -583,7 +626,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() { verify(adminClient).alterConfigs(ImmutableMap.of( topicResource("peter"), new Config(ImmutableSet.of( - new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + new ConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1234") )) )); @@ -594,12 +637,12 @@ public void shouldNotAlterStringConfigIfMatchingConfigOverrideExists() { // Given: givenTopicConfigs( "peter", - overriddenConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), + overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT), defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy") ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT ); // When: @@ -641,7 +684,7 @@ public void shouldRetryAddingTopicConfig() { ); final Map overrides = ImmutableMap.of( - TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT ); when(adminClient.incrementalAlterConfigs(any())) diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index 3dc720b470eb..32c78ccb5fa2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -345,7 +345,7 @@ public void shouldCreateMissingTopic() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -362,7 +362,7 @@ public void shouldCreateMissingTopicForCreate() { "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); } @Test @@ -401,7 +401,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForCreateTable() { } @Test - public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() { + public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowedTables() { // Given: givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS);"); @@ -415,7 +415,8 @@ public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() "expectedName", 10, (short) 10, - ImmutableMap.of()); + ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)); } @Test