Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: windowed tables now have cleanup policy compact+delete #5743

Merged
merged 4 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.services.KafkaTopicClient;
Expand All @@ -34,7 +34,6 @@
import io.confluent.ksql.topic.TopicProperties.Builder;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -126,7 +125,10 @@ private ConfiguredStatement<? extends CreateSource> injectForCreateSource(
properties.getPartitions(),
properties.getReplicas());

createTopic(topicPropertiesBuilder, createSource instanceof CreateTable);
final String topicCleanUpPolicy = createSource instanceof CreateTable
? TopicConfig.CLEANUP_POLICY_COMPACT : TopicConfig.CLEANUP_POLICY_DELETE;

createTopic(topicPropertiesBuilder, topicCleanUpPolicy);

return statement;
}
Expand Down Expand Up @@ -157,10 +159,16 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec
properties.getPartitions(),
properties.getReplicas());

final boolean shouldCompactTopic = createAsSelect instanceof CreateTableAsSelect
&& !createAsSelect.getQuery().getWindow().isPresent();
final String topicCleanUpPolicy;
if (createAsSelect instanceof CreateStreamAsSelect) {
topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE;
} else {
topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent()
? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE
: TopicConfig.CLEANUP_POLICY_COMPACT;
}

final TopicProperties info = createTopic(topicPropertiesBuilder, shouldCompactTopic);
final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy);

final T withTopic = (T) createAsSelect.copyWith(properties.withTopic(
info.getTopicName(),
Expand All @@ -175,13 +183,12 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec

private TopicProperties createTopic(
final Builder topicPropertiesBuilder,
final boolean shouldCompactTopic
final String topicCleanUpPolicy
) {
final TopicProperties info = topicPropertiesBuilder.build();

final Map<String, ?> config = shouldCompactTopic
? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
: Collections.emptyMap();
final Map<String, ?> config =
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy);
Comment on lines +190 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that for create stream previously we were using the default for the cluster, but now we've changed that to explicitly specify delete. Is that correct/desired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is delete: https://kafka.apache.org/documentation/#cleanup.policy

I thought explicitly specifying the policy was cleaner than having empty vs non-empty maps, but if there are situations you're concerned about in terms of correctness we should definitely investigate / return to passing empty maps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to check is to see if the CREATE STREAM code path will change the config of a topic to delete if it already exists. Another thing to consider if it's possible (though I wouldn't understand why anyone would do this) to change the default for the cluster for cleanup.policy. If neither of these are a real concern, then I'm happy making it explicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out it's possible to change the default cleanup.policy using the broker config log.cleanup.policy. I've switched this PR back to specifying empty configs in the case of streams in order to avoid a breaking change, but this behavior (using the default for streams and forcing tables to be compacted) feels weird to me. Is it preferable to use the broker default for streams, rather than specifying delete?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, there's definitely a strong argument toward forcing delete for streams... I could be convinced either way, perhaps now I'm actually leaning a little bit toward your original code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll go with the original then.

Regarding your other question:

Something to check is to see if the CREATE STREAM code path will change the config of a topic to delete if it already exists.

This never happens since the KafkaTopicClient does not update topics if they exist. (It also doesn't even validate the cleanup policy, only number of replicas and partitions:

validateTopicProperties(topic, numPartitions, replicationFactor);
)


topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void shouldCreateMissingTopic() {
"expectedName",
10,
(short) 10,
ImmutableMap.of());
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE));
}

@Test
Expand All @@ -362,7 +362,7 @@ public void shouldCreateMissingTopicForCreate() {
"expectedName",
10,
(short) 10,
ImmutableMap.of());
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE));
}

@Test
Expand Down Expand Up @@ -401,7 +401,7 @@ public void shouldCreateMissingTopicWithCompactCleanupPolicyForCreateTable() {
}

@Test
public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables() {
public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowedTables() {
// Given:
givenStatement("CREATE TABLE x WITH (kafka_topic='topic') "
+ "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS);");
Expand All @@ -415,7 +415,8 @@ public void shouldCreateMissingTopicWithDefaultCleanupPolicyForWindowedTables()
"expectedName",
10,
(short) 10,
ImmutableMap.of());
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE));
}

@Test
Expand Down