Skip to content

Commit

Permalink
fix: windowed tables now have cleanup policy compact+delete (confluen…
Browse files Browse the repository at this point in the history
…tinc#5743)

BREAKING CHANGE: ksqlDB now creates windowed tables with cleanup policy "compact,delete", rather than "compact". Also, topics that back streams are always created with cleanup policy "delete", rather than the broker default (by default, "delete").
  • Loading branch information
vcrfxia committed Jul 16, 2020
1 parent c87aa69 commit 08ff35a
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,17 @@ public boolean addTopicConfig(final String topicName, final Map<String, ?> overr
@Override
public TopicCleanupPolicy getTopicCleanupPolicy(final String topicName) {
final String policy = getTopicConfig(topicName)
.getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "");

switch (policy) {
case "compact":
return TopicCleanupPolicy.COMPACT;
case "delete":
return TopicCleanupPolicy.DELETE;
case "compact+delete":
return TopicCleanupPolicy.COMPACT_DELETE;
default:
throw new KsqlException("Could not get the topic configs for : " + topicName);
.getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "")
.toLowerCase();

if (policy.equals("compact")) {
return TopicCleanupPolicy.COMPACT;
} else if (policy.equals("delete")) {
return TopicCleanupPolicy.DELETE;
} else if (policy.contains("compact") && policy.contains("delete")) {
return TopicCleanupPolicy.COMPACT_DELETE;
} else {
throw new KsqlException("Could not get the topic configs for : " + topicName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.services.KafkaTopicClient;
Expand All @@ -34,7 +34,6 @@
import io.confluent.ksql.topic.TopicProperties.Builder;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -126,7 +125,10 @@ private ConfiguredStatement<? extends CreateSource> injectForCreateSource(
properties.getPartitions(),
properties.getReplicas());

createTopic(topicPropertiesBuilder, createSource instanceof CreateTable);
final String topicCleanUpPolicy = createSource instanceof CreateTable
? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE;

createTopic(topicPropertiesBuilder, topicCleanUpPolicy);

return statement;
}
Expand Down Expand Up @@ -157,10 +159,16 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec
properties.getPartitions(),
properties.getReplicas());

final boolean shouldCompactTopic = createAsSelect instanceof CreateTableAsSelect
&& !createAsSelect.getQuery().getWindow().isPresent();
final String topicCleanUpPolicy;
if (createAsSelect instanceof CreateStreamAsSelect) {
topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE;
} else {
topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent()
? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE
: TopicConfig.CLEANUP_POLICY_COMPACT;
}

final TopicProperties info = createTopic(topicPropertiesBuilder, shouldCompactTopic);
final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy);

final T withTopic = (T) createAsSelect.copyWith(properties.withTopic(
info.getTopicName(),
Expand All @@ -175,13 +183,12 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec

private TopicProperties createTopic(
final Builder topicPropertiesBuilder,
final boolean shouldCompactTopic
final String topicCleanUpPolicy
) {
final TopicProperties info = topicPropertiesBuilder.build();

final Map<String, ?> config = shouldCompactTopic
? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
: Collections.emptyMap();
final Map<String, ?> config =
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy);

topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void shouldAggregateTumblingWindow() {

// Then:
assertOutputOf(resultStream0, expected, is(expected));
assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0);
assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0);
}

@Test
Expand All @@ -179,7 +179,7 @@ public void shouldAggregateHoppingWindow() {

// Then:
assertOutputOf(resultStream0, expected, is(expected));
assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 3, resultStream0);
assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 3, resultStream0);
}

@Test
Expand Down Expand Up @@ -210,7 +210,7 @@ public void shouldAggregateSessionWindow() {

// Then:
assertOutputOf(resultStream0, expected, mapHasItems(expected));
assertTopicsCleanedUp(TopicCleanupPolicy.DELETE, 2, resultStream0);
assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT_DELETE, 2, resultStream0);
}

private void givenTable(final String sql) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.confluent.ksql.services;

import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
Expand All @@ -39,6 +41,7 @@
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.exception.KafkaTopicExistsException;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.services.KafkaTopicClient.TopicCleanupPolicy;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -459,6 +462,46 @@ public void shouldGetTopicConfig() {
assertThat(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), is("1"));
}

@Test
public void shouldGetTopicCleanUpPolicyDelete() {
// Given:
givenTopicConfigs(
"foo",
overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE)
);

// When / Then:
assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"),
is(TopicCleanupPolicy.DELETE));
}

@Test
public void shouldGetTopicCleanUpPolicyCompact() {
// Given:
givenTopicConfigs(
"foo",
overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)
);

// When / Then:
assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"),
is(TopicCleanupPolicy.COMPACT));
}

@Test
public void shouldGetTopicCleanUpPolicyCompactAndDelete() {
// Given:
givenTopicConfigs(
"foo",
overriddenConfigEntry(CLEANUP_POLICY_CONFIG,
CLEANUP_POLICY_COMPACT + "," + CLEANUP_POLICY_DELETE)
);

// When / Then:
assertThat(kafkaTopicClient.getTopicCleanupPolicy("foo"),
is(TopicCleanupPolicy.COMPACT_DELETE));
}

@Test
public void shouldThrowOnNoneRetryableGetTopicConfigError() {
// Given:
Expand Down Expand Up @@ -517,7 +560,7 @@ public void shouldSetStringTopicConfig() {
);

final Map<String, ?> configOverrides = ImmutableMap.of(
TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT
);

// When:
Expand All @@ -528,7 +571,7 @@ public void shouldSetStringTopicConfig() {
verify(adminClient).incrementalAlterConfigs(ImmutableMap.of(
topicResource("peter"),
ImmutableSet.of(
setConfig(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)
setConfig(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)
)
));
}
Expand Down Expand Up @@ -570,7 +613,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() {
);

final Map<String, ?> overrides = ImmutableMap.of(
TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT
);

when(adminClient.incrementalAlterConfigs(any()))
Expand All @@ -583,7 +626,7 @@ public void shouldFallBackToAddTopicConfigForOlderBrokers() {
verify(adminClient).alterConfigs(ImmutableMap.of(
topicResource("peter"),
new Config(ImmutableSet.of(
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT),
new ConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT),
new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1234")
))
));
Expand All @@ -594,12 +637,12 @@ public void shouldNotAlterStringConfigIfMatchingConfigOverrideExists() {
// Given:
givenTopicConfigs(
"peter",
overriddenConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT),
overriddenConfigEntry(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT),
defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")
);

final Map<String, ?> overrides = ImmutableMap.of(
TopicConfig.CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT
);

// When:
Expand Down Expand Up @@ -641,7 +684,7 @@ public void shouldRetryAddingTopicConfig() {
);

final Map<String, ?> overrides = ImmutableMap.of(
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
);

when(adminClient.incrementalAlterConfigs(any()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void shouldCreateMissingTopic() {
"expectedName",
10,
(short) 10,
ImmutableMap.of());
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE));
}

@Test
Expand All @@ -362,7 +362,7 @@ public void shouldCreateMissingTopicForCreate() {
"expectedName",
10,
(short) 10,
ImmutableMap.of());
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE));
}

@Test
Expand Down Expand Up @@ -401,7 +401,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForCreateTable() {
}

@Test
public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() {
public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowedTables() {
// Given:
givenStatement("CREATE TABLE x WITH (kafka_topic='topic') "
+ "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS);");
Expand All @@ -415,7 +415,8 @@ public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables()
"expectedName",
10,
(short) 10,
ImmutableMap.of());
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE));
}

@Test
Expand Down

0 comments on commit 08ff35a

Please sign in to comment.