From 3454a9671f7245cbdb2dc8662fb1e0b4e9c41c0e Mon Sep 17 00:00:00 2001 From: mkandaswamy Date: Thu, 21 Oct 2021 00:29:49 -0700 Subject: [PATCH 1/3] test: Improve the test QuickDegradeAndRestoreCommandTopicIntegrationTest by checking for topic deletion status via assertThatEventually() before checking for degraded state. If the command topic gets recreated somehow and is the reason behind the flakiness, this change helps to isolate that. --- .../QuickDegradeAndRestoreCommandTopicIntegrationTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java index cb619ebb000b..0c0cfb24d1dc 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java @@ -121,8 +121,8 @@ public void shouldBeInDegradeModeAfterCmdTopicDeleteAndRestart() throws Excepti REST_APP.start(); // Then + assertThatEventually("Topic Deleted", this::isCommandTopicDeleted, is(true)); assertThatEventually("Degraded State", this::isDegradedState, is(true)); - assertThat(TEST_HARNESS.topicExists(commandTopic), is(false)); REST_APP.stop(); KsqlRestoreCommandTopic.main( new String[]{ @@ -139,6 +139,10 @@ public void shouldBeInDegradeModeAfterCmdTopicDeleteAndRestart() throws Excepti assertThatEventually("Degraded State", this::isDegradedState, is(false)); } + private boolean isCommandTopicDeleted() { + return !TEST_HARNESS.topicExists(commandTopic); + } + private boolean isDegradedState() { // If in degraded state, then the following command will return a warning final List response = makeKsqlRequest( From 4afc2b90ed8ada83eb7d8de6a82669b3fe0b37d3 Mon Sep 17 00:00:00 2001 From: mkandaswamy Date: Thu, 21 Oct 2021 12:52:44 -0700 Subject: [PATCH 2/3] Fix a race condition in QuickDegradeAndRestoreCommandTopicIntegrationTest which lead to automatic recreation of deleted command topic by setting Kafka config: auto.create.topics.enable to false. Also, added another check to ensure command topic is deleted before stopping ksqldb. --- .../QuickDegradeAndRestoreCommandTopicIntegrationTest.java | 1 + .../ksql/test/util/EmbeddedSingleNodeKafkaCluster.java | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java index 0c0cfb24d1dc..804230cba1e6 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java @@ -117,6 +117,7 @@ public void shouldBeInDegradeModeAfterCmdTopicDeleteAndRestart() throws Excepti // When // Delete the command topic and restart TEST_HARNESS.deleteTopics(Collections.singletonList(commandTopic)); + assertThatEventually("Topic Deleted", this::isCommandTopicDeleted, is(true)); REST_APP.stop(); REST_APP.start(); diff --git a/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java b/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java index c1df4060555d..b4fe5e5a2ac1 100644 --- a/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java +++ b/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java @@ -576,8 +576,9 @@ private Properties buildBrokerConfig(final String logDir) { // Need to know where ZK is: config.put(KafkaConfig.ZkConnectProp(), zookeeper.connectString()); config.put(AclAuthorizer.ZkUrlProp(), zookeeper.connectString()); - // Do not require tests to explicitly create tests: - config.put(KafkaConfig.AutoCreateTopicsEnableProp(), true); + // Create topics explicitly when needed to avoid a race which + // automatically recreates deleted command topic: + config.put(KafkaConfig.AutoCreateTopicsEnableProp(), false); // Default to small number of partitions for auto-created topics: config.put(KafkaConfig.NumPartitionsProp(), 1); // Allow tests to delete topics: From c2f61a31a505cef0d415aad9fe11fca9172c75b2 Mon Sep 17 00:00:00 2001 From: mkandaswamy Date: Thu, 21 Oct 2021 15:57:07 -0700 Subject: [PATCH 3/3] Restore the default to true for KafkaConfig: auto.create.topics.enable as few tests depend on that setting. Also, added new method withoutAutoCreateTopics() to create Kafka cluster with auto create topics feature turned off. This method is used by QuickDegradeAndRestoreCommandTopicIntegrationTest to fix a race which lead to automatic recreation of deleted command topic. --- ...kDegradeAndRestoreCommandTopicIntegrationTest.java | 7 ++++++- .../test/util/EmbeddedSingleNodeKafkaCluster.java | 11 ++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java index 804230cba1e6..d697178ecf36 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java @@ -17,6 +17,7 @@ import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic; +import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; import io.confluent.ksql.test.util.KsqlTestFolder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.ReservedInternalTopics; @@ -44,7 +45,11 @@ @Category({IntegrationTest.class}) public class QuickDegradeAndRestoreCommandTopicIntegrationTest { - private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.builder() + .withKafkaCluster( + EmbeddedSingleNodeKafkaCluster.newBuilder() + .withoutAutoCreateTopics() + ).build(); @ClassRule public static final RuleChain CHAIN = RuleChain diff --git a/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java b/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java index b4fe5e5a2ac1..1416346abf75 100644 --- a/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java +++ b/ksqldb-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java @@ -576,9 +576,6 @@ private Properties buildBrokerConfig(final String logDir) { // Need to know where ZK is: config.put(KafkaConfig.ZkConnectProp(), zookeeper.connectString()); config.put(AclAuthorizer.ZkUrlProp(), zookeeper.connectString()); - // Create topics explicitly when needed to avoid a race which - // automatically recreates deleted command topic: - config.put(KafkaConfig.AutoCreateTopicsEnableProp(), false); // Default to small number of partitions for auto-created topics: config.put(KafkaConfig.NumPartitionsProp(), 1); // Allow tests to delete topics: @@ -695,6 +692,14 @@ public static final class Builder { brokerConfig.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp(), true); brokerConfig.put(KafkaConfig.ListenersProp(), "PLAINTEXT://:0"); + brokerConfig.put(KafkaConfig.AutoCreateTopicsEnableProp(), true); + } + + public Builder withoutAutoCreateTopics() { + // Create topics explicitly when needed to avoid a race which + // automatically recreates deleted topic: + brokerConfig.put(KafkaConfig.AutoCreateTopicsEnableProp(), false); + return this; } public Builder withoutPlainListeners() {