-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set the clean up policy to compact for non-windowed tables. #1042
Set the clean up policy to compact for non-windowed tables. #1042
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good.
My main comment is that we need to add unit test cases in KSQLStructuredDataOutputNodeTest
which verifies that non windowed tables actually are created with compaction enabled, that windowed ktables are not created with compaction, and that kstreams are not created with compaction.
ksqlConfig, | ||
kafkaTopicClient, | ||
((result instanceof SchemaKTable) | ||
&& !((SchemaKTable) result).isWindowed())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this boolean to a helper method named shouldCompact
. Makes the intent much clearer.
@@ -129,6 +137,12 @@ public boolean isTopicExists(final String topic) { | |||
} | |||
} | |||
|
|||
@Override | |||
public DescribeConfigsResult describeConfigs(String topicName) { | |||
return adminClient.describeConfigs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use the RetryHelper.executeWithRetries
method. Otherwise the command will fail for even retriable exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the method to use RetryHelper and return the clean up policy only.
@@ -187,6 +203,44 @@ public void testSinkProperties() throws Exception { | |||
assertThat( | |||
topicClient.describeTopics(ImmutableList.of(streamName)).get(streamName).partitions(), | |||
hasSize(3)); | |||
Map<ConfigResource, Config> configResourceConfigMap = topicClient.describeConfigs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is used 3 times. Should be moved to a utility method named verifyCleanupPolicy(String policy)
. Then you can get the property from the cluster and verify that it matches the expectation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the topic client api change this will be one line only.
@@ -92,7 +93,7 @@ public void before() { | |||
props.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4); | |||
props.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short)3); | |||
createOutputNode(props); | |||
topicClient.createTopic(eq("output"), anyInt(), anyShort()); | |||
topicClient.createTopic(eq("output"), anyInt(), anyShort(), anyBoolean()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a unit test here that verifies that non windowed KTables actually are being created with cleanup.polic=compact
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good. Left some minor comments inline
} | ||
|
||
@Test | ||
public void testSinkCleanupPolicy() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to testTableSinkCleanupProperty
.filter | ||
(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy")).toArray(); | ||
assertThat(configEntries[0], instanceOf(ConfigEntry.class)); | ||
assertThat(((ConfigEntry) configEntries[0]).value(), equalTo("compact")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a similar check to shouldAggregateTumblingWindow that verifies that the policy is "delete"?
DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class); | ||
ConfigEntry configEntry = new ConfigEntry("delete.topic.enable", "true"); | ||
ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true"); | ||
ConfigEntry configEntryIsCompacted = new ConfigEntry("cleanup.policy", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this returned config entry actually used anywhere in admin client? The only usage of describeConfigs I see is to get the broker's delete.topic.enable config. If cleanup.policy is not read anywhere can we just get rid of it and the isCompacted parameter to this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it was only used in tests. Removed it.
@@ -74,11 +73,12 @@ void createTopic( | |||
Map<String, TopicDescription> describeTopics(Collection<String> topicNames); | |||
|
|||
/** | |||
* [warn] synchronous call to get the response | |||
* [warn] Asynchronous call to get the response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this call looks synchronous to me
new FunctionRegistry(), | ||
new HashMap<>(), | ||
new MockSchemaRegistryClient()); | ||
topicClientForNonWindowTable.createTopic(eq("output"), anyInt(), anyShort(), eq(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this plus the next 2 lines come before buildStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, indeed this needed more changes.
@@ -74,11 +73,12 @@ void createTopic( | |||
Map<String, TopicDescription> describeTopics(Collection<String> topicNames); | |||
|
|||
/** | |||
* [warn] synchronous call to get the response | |||
* [warn] Asynchronous call to get the response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a synchronous call. It blocks until it gets a successful response or will throw an exception if it didn't get a successful response after 5 retries.
* | ||
* @param topicName topicName to describe | ||
* @param topicName topicNames to describe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why make this plural. It is still a single topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This calls AdminClient.describeTopics
which gets a collection of topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. But this method just takes a single topic and passes a list with a single element and also assumes the output contains a single pair.
So it is more appropriate for javadoc for this method to be singular rather than plural.
*/ | ||
DescribeConfigsResult describeConfigs(String topicName); | ||
public String getTopicCleanupPolicy(String topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better if this was an enum with three values: delete
, compact
, compact+delete
. Would be better than each user of the method relying on the actual string literals.
Optional.empty()); | ||
|
||
StreamsBuilder streamsBuilder = new StreamsBuilder(); | ||
SchemaKTable schemaKTable = (SchemaKTable) outputNode.buildStream(streamsBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should first have a check assertThat(scheamKstream, instanceof(schemaKTable))
and then do the cast.
new FunctionRegistry(), | ||
new HashMap<>(), | ||
new MockSchemaRegistryClient()); | ||
topicClientForNonWindowTable.createTopic(eq("output"), anyInt(), anyShort(), eq(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better to have 4
and3
instead fo anyInt()
and anyShort()
.
new MockSchemaRegistryClient()); | ||
topicClientForNonWindowTable.createTopic(eq("output"), anyInt(), anyShort(), eq(true)); | ||
EasyMock.expectLastCall(); | ||
EasyMock.replay(topicClientForNonWindowTable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems off.
We should be doing:
topicClientForNonWindowTable.createTopic(...)
Easymock.expectLastCall();
EasyMock.replay(topicClientForNonWindownTable);
outputNode.buildStream(...);
EasyMock.verify(topicClientForNonWindowTable);
This way we are actually checking that the right call was made to the topic client when trying to create the sink topic.
Also, we should add tests which check that the compaction policy is delete for streams and windowed tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made changes and added tests for both windowed table and stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there. My main comment is that we should start using the values of the new enums in the tests. In fact, I am not sure how the code compiled.
"Wait for async topic creation" | ||
); | ||
|
||
assertThat(topicClient.getTopicCleanupPolicy(tableName), equalTo("compact")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here and elsewhere we should be checking equivalence to TopicCleanupPolicy.COMPACT
etc.
topicClientForNonWindowTable.createTopic("output", 4, (short) 3, true); | ||
EasyMock.replay(topicClientForNonWindowTable); | ||
SchemaKStream schemaKStream = outputNode.buildStream(streamsBuilder, | ||
ksqlConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation.
schema); | ||
|
||
return new KsqlStructuredDataOutputNode(new PlanNodeId("0"), | ||
tableSourceNode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation.
@@ -64,24 +66,31 @@ public KafkaTopicClientImpl(final AdminClient adminClient) { | |||
public void createTopic( | |||
final String topic, | |||
final int numPartitions, | |||
final short replicatonFactor | |||
final short replicatonFactor, | |||
boolean isCompacted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use the TopicCleanupPolicy here instead of isCompacted. Also, wouldn't it be better to use compact+delete for topics backing windowed aggregations?
Both of these changes can be made in a later PR targeting master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. We should cleanup the KafkaTopicClientTest before merging: some of those changes seem redundant and were perhaps added during an early iteration.
@@ -316,12 +317,13 @@ private DescribeClusterResult getDescribeClusterResult() { | |||
return describeClusterResult; | |||
} | |||
|
|||
private DescribeConfigsResult getDescribeConfigsResult() { | |||
private DescribeConfigsResult getDescribeConfigsResult(boolean isCompacted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? This variable doesn't seem to be used at all and really doesn't need to be changed relative to the other changes in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was left from previous changes. Removed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm modulo one nit
.stream() | ||
.filter(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy")) | ||
.toArray(); | ||
if (configValues == null || configValues.length ==0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: == 0
Currently we don't set the clean up policy for sink topics and it uses the default which is
delete
. For the non-window tables we should usecompact
since these are changelog topics.For the windowed tables we still use
delete
clean up policy since we would prefer to not keep the windowed table forever!