Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the clean up policy to compact for non-windowed tables. #1042

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ public SchemaKStream buildStream(
createSinkTopic(noRowKey.getKafkaTopicName(),
ksqlConfig,
kafkaTopicClient,
((result instanceof SchemaKTable)
&& !((SchemaKTable) result).isWindowed()));
shoulBeCompacted(result));
result.into(
noRowKey.getKafkaTopicName(),
noRowKey.getKsqlTopic().getKsqlTopicSerDe()
Expand All @@ -153,6 +152,11 @@ public SchemaKStream buildStream(
return result;
}

private boolean shoulBeCompacted(SchemaKStream result) {
return (result instanceof SchemaKTable)
&& !((SchemaKTable) result).isWindowed();
}

private SchemaKStream createOutputStream(
final SchemaKStream schemaKStream,
final KsqlStructuredDataOutputNode.Builder outputNodeBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.confluent.ksql.util;

import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.TopicDescription;

import java.io.Closeable;
Expand Down Expand Up @@ -74,11 +73,12 @@ void createTopic(
Map<String, TopicDescription> describeTopics(Collection<String> topicNames);

/**
* [warn] synchronous call to get the response
* [warn] Asynchronous call to get the response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a synchronous call. It blocks until it gets a successful response or will throw an exception if it didn't get a successful response after 5 retries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this call looks synchronous to me

*
* @param topicName topicName to describe
* @param topicName topicNames to describe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this plural. It is still a single topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls AdminClient.describeTopics which gets a collection of topics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But this method just takes a single topic and passes a list with a single element and also assumes the output contains a single pair.

So it is more appropriate for javadoc for this method to be singular rather than plural.

*/
DescribeConfigsResult describeConfigs(String topicName);
public String getTopicCleanupPolicy(String topicName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better if this was an enum with three values: delete, compact, compact+delete. Would be better than each user of the method relying on the actual string literals.



/**
* Delete the list of the topics in the given list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
Expand Down Expand Up @@ -137,10 +138,34 @@ public Map<String, TopicDescription> describeTopics(final Collection<String> top
}
}



@Override
public DescribeConfigsResult describeConfigs(String topicName) {
return adminClient.describeConfigs(
Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));
public String getTopicCleanupPolicy(String topicName) {
RetryHelper<Map<ConfigResource, Config>> retryHelper = new RetryHelper<>();
Map<ConfigResource, Config> configMap = null;
try {
configMap = retryHelper.executeWithRetries(
() -> {
return adminClient.describeConfigs(Collections.singleton(
new ConfigResource(ConfigResource.Type.TOPIC, topicName)))
.all();
});
} catch (Exception e) {
throw new KsqlException("Could not get the topic configs for : " + topicName, e);
}
if (configMap == null) {
throw new KsqlException("Could not get the topic configs for : " + topicName);
}
Object[] configValues = configMap.values().stream().findFirst().get()
.entries()
.stream()
.filter(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy"))
.toArray();
if (configValues == null || configValues.length ==0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: == 0

throw new KsqlException("Could not get the topic configs for : " + topicName);
}
return ((ConfigEntry) configValues[0]).value();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void execInitCreateStreamQueries() throws Exception {

String usersTableStr = String.format("CREATE TABLE %s (userid varchar, age integer) WITH "
+ "(value_format = 'json', kafka_topic='%s', "
+ "key='userid');",
+ "KEY='userid');",
usersTable, usersTopic);

String messageStreamStr = String.format("CREATE STREAM %s (message varchar) WITH (value_format = 'json', "
Expand Down Expand Up @@ -203,21 +203,11 @@ public void testSinkProperties() throws Exception {
assertThat(
topicClient.describeTopics(ImmutableList.of(streamName)).get(streamName).partitions(),
hasSize(3));
Map<ConfigResource, Config> configResourceConfigMap = topicClient.describeConfigs
(streamName).all().get(1000, TimeUnit.MILLISECONDS);
assertThat(configResourceConfigMap.values().size(), equalTo(1));
Object[] configEntries =
configResourceConfigMap.values().stream().findFirst().get()
.entries()
.stream()
.filter
(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy")).toArray();
assertThat(configEntries[0], instanceOf(ConfigEntry.class));
assertThat(((ConfigEntry) configEntries[0]).value(), equalTo("delete"));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo("delete"));
}

@Test
public void testSinkCleanupPolicy() throws Exception {
public void testTableSinkCleanupProperty() throws Exception {
final String tableName = "SinkCleanupTable".toUpperCase();
final int resultPartitionCount = 3;
final String queryString = String.format("CREATE TABLE %s AS SELECT * "
Expand All @@ -230,17 +220,7 @@ public void testSinkCleanupPolicy() throws Exception {
"Wait for async topic creation"
);

Map<ConfigResource, Config> configResourceConfigMap = topicClient.describeConfigs
(tableName).all().get(1000, TimeUnit.MILLISECONDS);
assertThat(configResourceConfigMap.values().size(), equalTo(1));
Object[] configEntries =
configResourceConfigMap.values().stream().findFirst().get()
.entries()
.stream()
.filter
(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy")).toArray();
assertThat(configEntries[0], instanceOf(ConfigEntry.class));
assertThat(((ConfigEntry) configEntries[0]).value(), equalTo("compact"));
assertThat(topicClient.getTopicCleanupPolicy(tableName), equalTo("compact"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here and elsewhere we should be checking equivalence to TopicCleanupPolicy.COMPACT etc.

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,7 @@ public void shouldAggregateWithNoWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));

Map<ConfigResource, Config> configResourceConfigMap = topicClient.describeConfigs
(streamName).all().get(1000, TimeUnit.MILLISECONDS);
assertThat(configResourceConfigMap.values().size(), equalTo(1));
Object[] configEntries =
configResourceConfigMap.values().stream().findFirst().get()
.entries()
.stream()
.filter
(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy")).toArray();
assertThat(configEntries[0], instanceOf(ConfigEntry.class));
assertThat(((ConfigEntry) configEntries[0]).value(), equalTo("compact"));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo("compact"));
}


Expand Down Expand Up @@ -174,6 +163,7 @@ public void shouldAggregateTumblingWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo("delete"));
}

private void updateResults(Map<String, GenericRow> results, Map<Windowed<String>, GenericRow> windowedResults) {
Expand Down Expand Up @@ -226,6 +216,7 @@ public void shouldAggregateHoppingWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo("delete"));
}

@Test
Expand Down Expand Up @@ -273,6 +264,7 @@ public void shouldAggregateSessionWindow() throws Exception {

assertThat("Expected to see 3 topics after clean up but seeing " + topicsAfterCleanUp.size
(), topicsAfterCleanUp.size(), equalTo(3));
assertThat(topicClient.getTopicCleanupPolicy(streamName), equalTo("delete"));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlStream;
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;

Expand Down Expand Up @@ -189,4 +191,47 @@ private SchemaKStream buildStream() {
new HashMap<>(), new MockSchemaRegistryClient());
}

@Test
public void shouldCreateSinkWithCorrectCleanupPolicy() {
KafkaTopicClient topicClientForNonWindowTable = EasyMock.createNiceMock(KafkaTopicClient.class);
final Map<String, Object> props = new HashMap<>();
props.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4);
props.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short)3);

StructuredDataSourceNode tableSourceNode = new StructuredDataSourceNode(
new PlanNodeId("0"),
new KsqlTable("sqlExpression", "datasource",
schema,
schema.field("key"),
schema.field("timestamp"),
new KsqlTopic("input", "input",
new KsqlJsonTopicSerDe()),
"TableStateStore",
false),
schema);

outputNode = new KsqlStructuredDataOutputNode(new PlanNodeId("0"),
tableSourceNode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation.

schema,
schema.field("timestamp"),
schema.field("key"),
new KsqlTopic("output", "output", new KsqlJsonTopicSerDe()),
"output",
props,
Optional.empty());

StreamsBuilder streamsBuilder = new StreamsBuilder();
SchemaKTable schemaKTable = (SchemaKTable) outputNode.buildStream(streamsBuilder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should first have a check assertThat(scheamKstream, instanceof(schemaKTable)) and then do the cast.

ksqlConfig,
topicClientForNonWindowTable,
new MetastoreUtil(),
new FunctionRegistry(),
new HashMap<>(),
new MockSchemaRegistryClient());
topicClientForNonWindowTable.createTopic(eq("output"), anyInt(), anyShort(), eq(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to have 4 and3 instead fo anyInt() and anyShort().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this plus the next 2 lines come before buildStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed this needed more changes.

EasyMock.expectLastCall();
EasyMock.replay(topicClientForNonWindowTable);
Copy link
Contributor

@apurvam apurvam Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems off.

We should be doing:

topicClientForNonWindowTable.createTopic(...)
Easymock.expectLastCall();
EasyMock.replay(topicClientForNonWindownTable);
outputNode.buildStream(...);
EasyMock.verify(topicClientForNonWindowTable);

This way we are actually checking that the right call was made to the topic client when trying to create the sink topic.

Also, we should add tests which check that the compaction policy is delete for streams and windowed tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes and added tests for both windowed table and stream.


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Map<String, TopicDescription> describeTopics(Collection<String> topicName
}

@Override
public DescribeConfigsResult describeConfigs(String topicName) {
public String getTopicCleanupPolicy(String topicName) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testCreateTopic() {
}

@Test
public void shouldUseExistingTopicWIthTheSameSpecsInsteadOfCreate() {
public void shouldUseExistingTopicWithTheSameSpecsInsteadOfCreate() {
AdminClient adminClient = mock(AdminClient.class);
expect(adminClient.describeCluster()).andReturn(getDescribeClusterResult());
expect(adminClient.listTopics()).andReturn(getListTopicsResult());
Expand Down Expand Up @@ -320,11 +320,8 @@ private DescribeClusterResult getDescribeClusterResult() {
private DescribeConfigsResult getDescribeConfigsResult(boolean isCompacted) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? This variable doesn't seem to be used at all and really doesn't need to be changed relative to the other changes in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was left from previous changes. Removed it.

DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true");
ConfigEntry configEntryIsCompacted = new ConfigEntry("cleanup.policy",
isCompacted? "compact": "delete");
List<ConfigEntry> configEntries = new ArrayList<>();
configEntries.add(configEntryDeleteEnable);
configEntries.add(configEntryIsCompacted);
Map<ConfigResource, Config> config = new HashMap<>();
config.put(new ConfigResource(ConfigResource.Type.BROKER, "1"), new Config(configEntries));
expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Map<String, TopicDescription> describeTopics(Collection<String> topicName


@Override
public DescribeConfigsResult describeConfigs(String topicName) {
public String getTopicCleanupPolicy(String topicName) {
return null;
}

Expand Down