Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the clean up policy to compact for non-windowed tables. #1042

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ public SchemaKStream buildStream(
);

final KsqlStructuredDataOutputNode noRowKey = outputNodeBuilder.build();
createSinkTopic(noRowKey.getKafkaTopicName(), ksqlConfig, kafkaTopicClient);
createSinkTopic(noRowKey.getKafkaTopicName(),
ksqlConfig,
kafkaTopicClient,
shoulBeCompacted(result));
result.into(
noRowKey.getKafkaTopicName(),
noRowKey.getKsqlTopic().getKsqlTopicSerDe()
Expand All @@ -149,6 +152,11 @@ public SchemaKStream buildStream(
return result;
}

private boolean shoulBeCompacted(SchemaKStream result) {
return (result instanceof SchemaKTable)
&& !((SchemaKTable) result).isWindowed();
}

private SchemaKStream createOutputStream(
final SchemaKStream schemaKStream,
final KsqlStructuredDataOutputNode.Builder outputNodeBuilder,
Expand Down Expand Up @@ -200,13 +208,17 @@ private void addAvroSchemaToResultTopic(final Builder builder) {
private void createSinkTopic(
final String kafkaTopicName,
KsqlConfig ksqlConfig,
KafkaTopicClient kafkaTopicClient
KafkaTopicClient kafkaTopicClient,
boolean isCompacted
) {
int numberOfPartitions =
(Integer) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY);
short numberOfReplications =
(Short) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY);
kafkaTopicClient.createTopic(kafkaTopicName, numberOfPartitions, numberOfReplications);
kafkaTopicClient.createTopic(kafkaTopicName,
numberOfPartitions,
numberOfReplications,
isCompacted);
}

public Field getTimestampField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@

public interface KafkaTopicClient extends Closeable {

enum TopicCleanupPolicy {
COMPACT,
DELETE,
COMPACT_DELETE
}


/**
* Create a new topic with the specified name, numPartitions and replicatonFactor.
* [warn] synchronous call to get the response
*
* @param topic name of the topic to create
*/
void createTopic(String topic, int numPartitions, short replicatonFactor);
void createTopic(String topic, int numPartitions, short replicatonFactor, boolean isCompacted);

/**
* Create a new topic with the specified name, numPartitions and replicatonFactor.
Expand All @@ -45,7 +52,8 @@ void createTopic(
String topic,
int numPartitions,
short replicatonFactor,
Map<String, String> configs
Map<String, String> configs,
boolean isCompacted
);

/**
Expand All @@ -70,6 +78,14 @@ void createTopic(
*/
Map<String, TopicDescription> describeTopics(Collection<String> topicNames);

/**
* [warn] synchronous call to get the response
*
* @param topicName topicNames to describe
*/
public TopicCleanupPolicy getTopicCleanupPolicy(String topicName);


/**
* Delete the list of the topics in the given list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
Expand All @@ -34,6 +35,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -64,24 +66,31 @@ public KafkaTopicClientImpl(final AdminClient adminClient) {
public void createTopic(
final String topic,
final int numPartitions,
final short replicatonFactor
final short replicatonFactor,
boolean isCompacted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the TopicCleanupPolicy here instead of isCompacted. Also, wouldn't it be better to use compact+delete for topics backing windowed aggregations?

Both of these changes can be made in a later PR targeting master

) {
createTopic(topic, numPartitions, replicatonFactor, Collections.emptyMap());
createTopic(topic, numPartitions, replicatonFactor, Collections.emptyMap(), isCompacted);
}

@Override
public void createTopic(
final String topic,
final int numPartitions,
final short replicationFactor,
final Map<String, String> configs
final Map<String, String> configs,
boolean isCompacted
) {
if (isTopicExists(topic)) {
validateTopicProperties(topic, numPartitions, replicationFactor);
return;
}
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
newTopic.configs(configs);
Map<String, String> newTopicConfigs = new HashMap<>();
newTopicConfigs.putAll(configs);
if (isCompacted) {
newTopicConfigs.put("cleanup.policy", "compact");
}
newTopic.configs(newTopicConfigs);
try {
log.info("Creating topic '{}'", topic);
RetryHelper<Void> retryHelper = new RetryHelper<>();
Expand Down Expand Up @@ -129,6 +138,45 @@ public Map<String, TopicDescription> describeTopics(final Collection<String> top
}
}



@Override
public TopicCleanupPolicy getTopicCleanupPolicy(String topicName) {
RetryHelper<Map<ConfigResource, Config>> retryHelper = new RetryHelper<>();
Map<ConfigResource, Config> configMap = null;
try {
configMap = retryHelper.executeWithRetries(
() -> {
return adminClient.describeConfigs(Collections.singleton(
new ConfigResource(ConfigResource.Type.TOPIC, topicName)))
.all();
});
} catch (Exception e) {
throw new KsqlException("Could not get the topic configs for : " + topicName, e);
}
if (configMap == null) {
throw new KsqlException("Could not get the topic configs for : " + topicName);
}
Object[] configValues = configMap.values().stream().findFirst().get()
.entries()
.stream()
.filter(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy"))
.toArray();
if (configValues == null || configValues.length == 0) {
throw new KsqlException("Could not get the topic configs for : " + topicName);
}
switch (((ConfigEntry) configValues[0]).value().toString().toLowerCase()) {
case "compact":
return TopicCleanupPolicy.COMPACT;
case "delete":
return TopicCleanupPolicy.DELETE;
case "compact+delete":
return TopicCleanupPolicy.COMPACT_DELETE;
default:
throw new KsqlException("Could not get the topic configs for : " + topicName);
}
}

@Override
public void deleteTopics(final List<String> topicsToDelete) {
if (!isDeleteTopicEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void shouldFailDDLStatementIfTopicDoesNotExist() {

@Test
public void shouldEnforceTopicExistenceCorrectly() throws Exception {
topicClient.createTopic("s1_topic", 1, (short) 1);
topicClient.createTopic("s1_topic", 1, (short) 1, false);
StringBuilder runScriptContent =
new StringBuilder("CREATE STREAM S1 (COL1 BIGINT, COL2 VARCHAR) "
+ "WITH (KAFKA_TOPIC = 's1_topic', VALUE_FORMAT = 'JSON');\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void before() {
new FakeKafkaTopicClient(),
new MockSchemaRegistryClient(),
metaStore);
ksqlEngine.getTopicClient().createTopic("test_topic", 1, (short) 1);
ksqlEngine.getTopicClient().createTopic("test_table", 1, (short) 1);
ksqlEngine.getTopicClient().createTopic("test_topic", 1, (short) 1, false);
ksqlEngine.getTopicClient().createTopic("test_table", 1, (short) 1, false);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public IntegrationTestHarness() {

// Topic generation
public void createTopic(String topicName) {
topicClient.createTopic(topicName, 1, (short) 1);
topicClient.createTopic(topicName, 1, (short) 1, false);
}
public void createTopic(String topicName, int numPartitions, short replicatonFactor) {
topicClient.createTopic(topicName, numPartitions, replicatonFactor);
topicClient.createTopic(topicName, numPartitions, replicatonFactor, false);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
import com.google.common.collect.ImmutableList;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand All @@ -33,6 +38,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlEngine;
Expand All @@ -50,12 +56,15 @@
import io.confluent.ksql.util.TopicProducer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

public class JsonFormatTest {
private static final String inputTopic = "orders_topic";
private static final String inputStream = "ORDERS";
private static final String usersTopic = "users_topic";
private static final String usersTable = "USERS";
private static final String messageLogTopic = "log_topic";
private static final String messageLogStream = "message_log";

Expand Down Expand Up @@ -92,8 +101,9 @@ public void before() throws Exception {
}

private void createInitTopics() {
topicClient.createTopic(inputTopic, 1, (short)1);
topicClient.createTopic(messageLogTopic, 1, (short)1);
topicClient.createTopic(inputTopic, 1, (short)1, false);
topicClient.createTopic(usersTopic, 1, (short)1, false);
topicClient.createTopic(messageLogTopic, 1, (short)1, false);
}

private void produceInitData() throws Exception {
Expand Down Expand Up @@ -121,10 +131,16 @@ private void execInitCreateStreamQueries() throws Exception {
+ "kafka_topic='%s' , "
+ "key='ordertime');", inputStream, inputTopic);

String usersTableStr = String.format("CREATE TABLE %s (userid varchar, age integer) WITH "
+ "(value_format = 'json', kafka_topic='%s', "
+ "KEY='userid');",
usersTable, usersTopic);

String messageStreamStr = String.format("CREATE STREAM %s (message varchar) WITH (value_format = 'json', "
+ "kafka_topic='%s');", messageLogStream, messageLogTopic);

ksqlEngine.buildMultipleQueries(ordersStreamStr, Collections.emptyMap());
ksqlEngine.buildMultipleQueries(usersTableStr, Collections.emptyMap());
ksqlEngine.buildMultipleQueries(messageStreamStr, Collections.emptyMap());
}

Expand Down Expand Up @@ -187,6 +203,26 @@ public void testSinkProperties() throws Exception {
assertThat(
topicClient.describeTopics(ImmutableList.of(streamName)).get(streamName).partitions(),
hasSize(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo(
KafkaTopicClient.TopicCleanupPolicy.DELETE));
}

@Test
public void testTableSinkCleanupProperty() throws Exception {
final String tableName = "SinkCleanupTable".toUpperCase();
final int resultPartitionCount = 3;
final String queryString = String.format("CREATE TABLE %s AS SELECT * "
+ "FROM %s;",
tableName, usersTable);
executePersistentQuery(queryString);

TestUtils.waitForCondition(
() -> topicClient.isTopicExists(tableName),
"Wait for async topic creation"
);

assertThat(topicClient.getTopicCleanupPolicy(tableName), equalTo(
KafkaTopicClient.TopicCleanupPolicy.COMPACT));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ private void produceInitData() throws Exception {
return;
}

topicClient.createTopic(INPUT_TOPIC, 1, (short) 1);
topicClient.createTopic(INPUT_TOPIC, 1, (short) 1, false);

final OrderDataProvider orderDataProvider = new OrderDataProvider();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import io.confluent.ksql.util.QueryMetadata;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.kstream.Windowed;
Expand All @@ -26,7 +29,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;

Expand Down Expand Up @@ -111,6 +116,8 @@ public void shouldAggregateWithNoWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo(
KafkaTopicClient.TopicCleanupPolicy.COMPACT));
}


Expand Down Expand Up @@ -157,6 +164,8 @@ public void shouldAggregateTumblingWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo(
KafkaTopicClient.TopicCleanupPolicy.DELETE));
}

private void updateResults(Map<String, GenericRow> results, Map<Windowed<String>, GenericRow> windowedResults) {
Expand Down Expand Up @@ -209,6 +218,8 @@ public void shouldAggregateHoppingWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo(
KafkaTopicClient.TopicCleanupPolicy.DELETE));
}

@Test
Expand Down Expand Up @@ -256,6 +267,8 @@ public void shouldAggregateSessionWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo(
KafkaTopicClient.TopicCleanupPolicy.DELETE));

}

Expand Down
Loading