diff --git a/systemtest/src/test/java/io/strimzi/systemtest/AbstractST.java b/systemtest/src/test/java/io/strimzi/systemtest/AbstractST.java index 59347626e98..5f453d4a462 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/AbstractST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/AbstractST.java @@ -25,8 +25,6 @@ import io.strimzi.systemtest.resources.operator.SetupClusterOperator; import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.utils.StUtils; -import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils; -import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils; import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; import io.strimzi.test.TestUtils; import io.strimzi.test.interfaces.TestSeparator; @@ -48,7 +46,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -83,7 +80,6 @@ public abstract class AbstractST implements TestSeparator { private static final Object LOCK = new Object(); protected static ConcurrentHashMap storageMap = new ConcurrentHashMap<>(); - protected static final String CLUSTER_NAME_PREFIX = "my-cluster-"; protected static final String KAFKA_IMAGE_MAP = "STRIMZI_KAFKA_IMAGES"; protected static final String KAFKA_CONNECT_IMAGE_MAP = "STRIMZI_KAFKA_CONNECT_IMAGES"; protected static final String KAFKA_MIRROR_MAKER_2_IMAGE_MAP = "STRIMZI_KAFKA_MIRROR_MAKER_2_IMAGES"; @@ -92,12 +88,6 @@ public abstract class AbstractST implements TestSeparator { protected static final String KAFKA_INIT_IMAGE = "STRIMZI_DEFAULT_KAFKA_INIT_IMAGE"; protected static final String TLS_SIDECAR_EO_IMAGE = "STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE"; - public static Random rng = new Random(); - - public static final int MESSAGE_COUNT = TestConstants.MESSAGE_COUNT; - public static final String USER_NAME = KafkaUserUtils.generateRandomNameOfKafkaUser(); - public static final String TOPIC_NAME = KafkaTopicUtils.generateRandomNameOfTopic(); - protected void assertResources(String namespace, String podName, String containerName, String memoryLimit, String cpuLimit, String memoryRequest, String cpuRequest) { Pod po = kubeClient(namespace).getPod(namespace, podName); assertThat("Not found an expected Pod " + namespace + "/" + podName + " but found " + diff --git a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeKafkaExternalListenersST.java b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeKafkaExternalListenersST.java index 666c3a462ab..6a69f18125f 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeKafkaExternalListenersST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeKafkaExternalListenersST.java @@ -143,7 +143,7 @@ private void testWeirdUsername(ExtensionContext extensionContext, String weirdUs .withConsumerName(ts.getClusterName() + "-" + consumerName) .withBootstrapAddress(KafkaBridgeResources.serviceName(ts.getClusterName())) .withTopicName(ts.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(ts.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withNamespaceName(ts.getNamespaceName()) .build(); @@ -198,7 +198,7 @@ private void testWeirdUsername(ExtensionContext extensionContext, String weirdUs .withBootstrapAddress(externalBootstrapServers) .withNamespaceName(ts.getNamespaceName()) .withTopicName(ts.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(ts.getMessageCount()) .withUsername(weirdUserName) // we disable ssl.endpoint.identification.algorithm for external listener (i.e., NodePort), // because TLS hostname verification is not supported on such listener type. @@ -213,9 +213,9 @@ private void testWeirdUsername(ExtensionContext extensionContext, String weirdUs resourceManager.createResourceWithWait(extensionContext, externalKafkaProducer.producerScramShaTlsStrimzi(ts.getClusterName())); } - ClientUtils.waitForClientSuccess(kafkaProducerExternalName, ts.getNamespaceName(), MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(kafkaProducerExternalName, ts.getNamespaceName(), ts.getMessageCount()); - ClientUtils.waitForClientSuccess(ts.getClusterName() + "-" + consumerName, ts.getNamespaceName(), MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(ts.getClusterName() + "-" + consumerName, ts.getNamespaceName(), ts.getMessageCount()); } @BeforeAll diff --git a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java index 9ee3260b268..63554ae3a75 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java @@ -80,7 +80,7 @@ void testSendSimpleMessage(ExtensionContext extensionContext) { .withProducerName(producerName) .withBootstrapAddress(KafkaBridgeResources.serviceName(httpBridgeClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withDelayMs(1000) .withPollInterval(1000) @@ -92,11 +92,11 @@ void testSendSimpleMessage(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJob.producerStrimziBridge()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); KafkaClients kafkaClients = new KafkaClientsBuilder() .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(httpBridgeClusterName)) .withConsumerName(consumerName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) @@ -104,7 +104,7 @@ void testSendSimpleMessage(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaClients.consumerStrimzi()); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); // Checking labels for KafkaBridge verifyLabelsOnPods(Environment.TEST_SUITE_NAMESPACE, httpBridgeClusterName, "my-bridge", "KafkaBridge"); @@ -124,7 +124,7 @@ void testReceiveSimpleMessage(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaBridgeResources.serviceName(httpBridgeClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withDelayMs(1000) .withPollInterval(1000) @@ -136,7 +136,7 @@ void testReceiveSimpleMessage(ExtensionContext extensionContext) { // Send messages to Kafka KafkaClients kafkaClients = new KafkaClientsBuilder() .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(httpBridgeClusterName)) .withProducerName(producerName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) @@ -144,7 +144,7 @@ void testReceiveSimpleMessage(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerStrimzi()); - ClientUtils.waitForClientsSuccess(producerName, consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientsSuccess(producerName, consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @ParallelTest diff --git a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeScramShaST.java b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeScramShaST.java index b6fd83bf414..fd6d917b21c 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeScramShaST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeScramShaST.java @@ -19,12 +19,14 @@ import io.strimzi.systemtest.kafkaclients.internalClients.BridgeClientsBuilder; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClientsBuilder; +import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates; import io.strimzi.systemtest.templates.crd.KafkaTemplates; import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates; import io.strimzi.systemtest.templates.crd.KafkaUserTemplates; import io.strimzi.systemtest.utils.ClientUtils; import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils; +import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,9 +47,11 @@ class HttpBridgeScramShaST extends AbstractST { private static final Logger LOGGER = LogManager.getLogger(HttpBridgeScramShaST.class); private final String httpBridgeScramShaClusterName = "http-bridge-scram-sha-cluster-name"; private BridgeClients kafkaBridgeClientJob; + private final String kafkaUserName = KafkaUserUtils.generateRandomNameOfKafkaUser(); @ParallelTest void testSendSimpleMessageTlsScramSha(ExtensionContext extensionContext) { + final TestStorage testStorage = storageMap.get(extensionContext); final String producerName = "producer-" + new Random().nextInt(Integer.MAX_VALUE); final String consumerName = "consumer-" + new Random().nextInt(Integer.MAX_VALUE); @@ -61,23 +65,24 @@ void testSendSimpleMessageTlsScramSha(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(httpBridgeScramShaClusterName, topicName, Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJb.producerStrimziBridge()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); KafkaClients kafkaClients = new KafkaClientsBuilder() .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(httpBridgeScramShaClusterName)) .withConsumerName(consumerName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) - .withUsername(USER_NAME) + .withUsername(kafkaUserName) .build(); resourceManager.createResourceWithWait(extensionContext, kafkaClients.consumerScramShaTlsStrimzi(httpBridgeScramShaClusterName)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @ParallelTest void testReceiveSimpleMessageTlsScramSha(ExtensionContext extensionContext) { + final TestStorage testStorage = storageMap.get(extensionContext); final String producerName = "producer-" + new Random().nextInt(Integer.MAX_VALUE); final String consumerName = "consumer-" + new Random().nextInt(Integer.MAX_VALUE); @@ -87,25 +92,27 @@ void testReceiveSimpleMessageTlsScramSha(ExtensionContext extensionContext) { .withConsumerName(consumerName) .build(); - resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(httpBridgeScramShaClusterName, TOPIC_NAME, Environment.TEST_SUITE_NAMESPACE).build()); + resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(httpBridgeScramShaClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJb.consumerStrimziBridge()); // Send messages to Kafka KafkaClients kafkaClients = new KafkaClientsBuilder() .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(httpBridgeScramShaClusterName)) .withProducerName(producerName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) - .withUsername(USER_NAME) + .withUsername(kafkaUserName) .build(); resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerScramShaTlsStrimzi(httpBridgeScramShaClusterName)); - ClientUtils.waitForClientsSuccess(producerName, consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientsSuccess(producerName, consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @BeforeAll void setUp(ExtensionContext extensionContext) { + final TestStorage testStorage = new TestStorage(extensionContext); + clusterOperator = clusterOperator.defaultInstallation(extensionContext) .createInstallation() .runInstallation(); @@ -131,7 +138,7 @@ void setUp(ExtensionContext extensionContext) { .endSpec().build()); // Create Kafka user - KafkaUser scramShaUser = KafkaUserTemplates.scramShaUser(Environment.TEST_SUITE_NAMESPACE, httpBridgeScramShaClusterName, USER_NAME) + KafkaUser scramShaUser = KafkaUserTemplates.scramShaUser(Environment.TEST_SUITE_NAMESPACE, httpBridgeScramShaClusterName, kafkaUserName) .editMetadata() .withNamespace(Environment.TEST_SUITE_NAMESPACE) .endMetadata() @@ -141,7 +148,7 @@ void setUp(ExtensionContext extensionContext) { // Initialize PasswordSecret to set this as PasswordSecret in MirrorMaker spec PasswordSecretSource passwordSecret = new PasswordSecretSource(); - passwordSecret.setSecretName(USER_NAME); + passwordSecret.setSecretName(kafkaUserName); passwordSecret.setPassword("password"); // Initialize CertSecretSource with certificate and Secret names for consumer @@ -160,7 +167,7 @@ void setUp(ExtensionContext extensionContext) { .addToConfig(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .endConsumer() .withNewKafkaClientAuthenticationScramSha512() - .withUsername(USER_NAME) + .withUsername(kafkaUserName) .withPasswordSecret(passwordSecret) .endKafkaClientAuthenticationScramSha512() .withNewTls() @@ -170,8 +177,8 @@ void setUp(ExtensionContext extensionContext) { kafkaBridgeClientJob = new BridgeClientsBuilder() .withBootstrapAddress(KafkaBridgeResources.serviceName(httpBridgeScramShaClusterName)) - .withTopicName(TOPIC_NAME) - .withMessageCount(MESSAGE_COUNT) + .withTopicName(testStorage.getTopicName()) + .withMessageCount(testStorage.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) .build(); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeTlsST.java b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeTlsST.java index 58f913a4055..0a565f15ccd 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeTlsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeTlsST.java @@ -18,12 +18,12 @@ import io.strimzi.systemtest.kafkaclients.internalClients.BridgeClientsBuilder; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClientsBuilder; +import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates; import io.strimzi.systemtest.templates.crd.KafkaTemplates; import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates; import io.strimzi.systemtest.templates.crd.KafkaUserTemplates; import io.strimzi.systemtest.utils.ClientUtils; -import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils; import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.logging.log4j.LogManager; @@ -53,24 +53,23 @@ class HttpBridgeTlsST extends AbstractST { @ParallelTest void testSendSimpleMessageTls(ExtensionContext extensionContext) { + final TestStorage testStorage = storageMap.get(extensionContext); final String producerName = "producer-" + new Random().nextInt(Integer.MAX_VALUE); final String consumerName = "consumer-" + new Random().nextInt(Integer.MAX_VALUE); - // Create topic - String topicName = KafkaTopicUtils.generateRandomNameOfTopic(); BridgeClients kafkaBridgeClientJobProduce = new BridgeClientsBuilder(kafkaBridgeClientJob) - .withTopicName(topicName) + .withTopicName(testStorage.getTopicName()) .withProducerName(producerName) .build(); - resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(httpBridgeTlsClusterName, topicName, Environment.TEST_SUITE_NAMESPACE).build()); + resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(httpBridgeTlsClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJobProduce.producerStrimziBridge()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); KafkaClients kafkaClients = new KafkaClientsBuilder() - .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withTopicName(testStorage.getTopicName()) + .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(httpBridgeTlsClusterName)) .withConsumerName(consumerName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) @@ -78,29 +77,29 @@ void testSendSimpleMessageTls(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, kafkaClients.consumerTlsStrimzi(httpBridgeTlsClusterName)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @ParallelTest void testReceiveSimpleMessageTls(ExtensionContext extensionContext) { + final TestStorage testStorage = storageMap.get(extensionContext); final String producerName = "producer-" + new Random().nextInt(Integer.MAX_VALUE); final String consumerName = "consumer-" + new Random().nextInt(Integer.MAX_VALUE); - String topicName = KafkaTopicUtils.generateRandomNameOfTopic(); BridgeClients kafkaBridgeClientJobConsume = new BridgeClientsBuilder(kafkaBridgeClientJob) - .withTopicName(topicName) + .withTopicName(testStorage.getTopicName()) .withConsumerName(consumerName) .build(); - resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(httpBridgeTlsClusterName, topicName, Environment.TEST_SUITE_NAMESPACE).build()); + resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(httpBridgeTlsClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJobConsume.consumerStrimziBridge()); // Send messages to Kafka KafkaClients kafkaClients = new KafkaClientsBuilder() - .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withTopicName(testStorage.getTopicName()) + .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(httpBridgeTlsClusterName)) .withProducerName(producerName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) @@ -108,11 +107,12 @@ void testReceiveSimpleMessageTls(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerTlsStrimzi(httpBridgeTlsClusterName)); - ClientUtils.waitForClientsSuccess(producerName, consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientsSuccess(producerName, consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @BeforeAll void setUp(ExtensionContext extensionContext) { + final TestStorage testStorage = new TestStorage(extensionContext); clusterOperator = clusterOperator.defaultInstallation(extensionContext) .createInstallation() .runInstallation(); @@ -171,8 +171,8 @@ void setUp(ExtensionContext extensionContext) { kafkaBridgeClientJob = new BridgeClientsBuilder() .withBootstrapAddress(KafkaBridgeResources.serviceName(httpBridgeTlsClusterName)) - .withTopicName(TOPIC_NAME) - .withMessageCount(MESSAGE_COUNT) + .withTopicName(testStorage.getTopicName()) + .withMessageCount(testStorage.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) .build(); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectBuilderST.java b/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectBuilderST.java index bece0648c73..5c1a81d6cb5 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectBuilderST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectBuilderST.java @@ -145,7 +145,7 @@ class ConnectBuilderST extends AbstractST { @ParallelTest void testBuildFailsWithWrongChecksumOfArtifact(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); final String imageName = getImageNameForTestCase(); @@ -215,7 +215,7 @@ void testBuildFailsWithWrongChecksumOfArtifact(ExtensionContext extensionContext @ParallelTest void testBuildWithJarTgzAndZip(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // this test also testing push into Docker output final String imageName = getImageNameForTestCase(); @@ -276,7 +276,7 @@ void testBuildWithJarTgzAndZip(ExtensionContext extensionContext) { @OpenShiftOnly @ParallelTest void testPushIntoImageStream(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); String imageStreamName = "custom-image-stream"; ImageStream imageStream = new ImageStreamBuilder() @@ -316,7 +316,7 @@ void testPushIntoImageStream(ExtensionContext extensionContext) { @ParallelTest void testUpdateConnectWithAnotherPlugin(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); String camelConnector = "camel-http-connector"; final String imageName = getImageNameForTestCase(); @@ -415,7 +415,7 @@ void testUpdateConnectWithAnotherPlugin(ExtensionContext extensionContext) { @ParallelTest void testBuildOtherPluginTypeWithAndWithoutFileName(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); final String imageName = getImageNameForTestCase(); @@ -475,7 +475,7 @@ void testBuildOtherPluginTypeWithAndWithoutFileName(ExtensionContext extensionCo @KindNotSupported("using kind we encounter (error building image: deleting file system after stage 0: unlinkat //product_uuid: device or resource busy)") @ParallelTest void testBuildPluginUsingMavenCoordinatesArtifacts(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); final String imageName = getImageNameForTestCase(); final String connectorName = testStorage.getClusterName() + "-camel-connector"; diff --git a/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectST.java b/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectST.java index 3466890b41e..954ad30bd1e 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/connect/ConnectST.java @@ -116,7 +116,7 @@ class ConnectST extends AbstractST { @ParallelNamespaceTest void testDeployRollUndeploy(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); final int connectReplicasCount = 2; @@ -244,7 +244,7 @@ void testKafkaConnectAndConnectorStateWithFileSinkPlugin(ExtensionContext extens @ParallelNamespaceTest @Tag(INTERNAL_CLIENTS_USED) void testKafkaConnectWithPlainAndScramShaAuthentication(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // Use a Kafka with plain listener disabled resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 3) @@ -326,7 +326,7 @@ void testKafkaConnectWithPlainAndScramShaAuthentication(ExtensionContext extensi @Tag(CONNECTOR_OPERATOR) @Tag(INTERNAL_CLIENTS_USED) void testKafkaConnectAndConnectorFileSinkPlugin(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 3).build()); @@ -412,7 +412,7 @@ void testJvmAndResources(ExtensionContext extensionContext) { @ParallelNamespaceTest @Tag(COMPONENT_SCALING) void testKafkaConnectScaleUpScaleDown(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); final String namespaceName = StUtils.getNamespaceBasedOnRbac(Environment.TEST_SUITE_NAMESPACE, extensionContext); final String clusterName = testStorage.getClusterName(); @@ -446,7 +446,7 @@ void testKafkaConnectScaleUpScaleDown(ExtensionContext extensionContext) { @ParallelNamespaceTest @Tag(INTERNAL_CLIENTS_USED) void testSecretsWithKafkaConnectWithTlsAndTlsClientAuthentication(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 3) .editSpec() @@ -527,7 +527,7 @@ void testSecretsWithKafkaConnectWithTlsAndTlsClientAuthentication(ExtensionConte @ParallelNamespaceTest @Tag(INTERNAL_CLIENTS_USED) void testSecretsWithKafkaConnectWithTlsAndScramShaAuthentication(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 3) .editSpec() @@ -606,7 +606,7 @@ void testSecretsWithKafkaConnectWithTlsAndScramShaAuthentication(ExtensionContex @ParallelNamespaceTest @MicroShiftNotSupported("The test is using Connect Build feature that is not available on MicroShift") void testConnectorTaskAutoRestart(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 3).build()); @@ -788,7 +788,7 @@ void testCustomAndUpdatedValues(ExtensionContext extensionContext) { @Tag(INTERNAL_CLIENTS_USED) @Tag(ACCEPTANCE) void testMultiNodeKafkaConnectWithConnectorCreation(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); final String connectClusterName = testStorage.getClusterName() + "-2"; @@ -818,7 +818,7 @@ void testMultiNodeKafkaConnectWithConnectorCreation(ExtensionContext extensionCo KafkaClients kafkaClients = new KafkaClientsBuilder() .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(testStorage.getClusterName())) .withProducerName(testStorage.getProducerName()) .withConsumerName(testStorage.getConsumerName()) @@ -972,6 +972,7 @@ void testConnectScramShaAuthWithWeirdUserName(ExtensionContext extensionContext) } void testConnectAuthorizationWithWeirdUserName(ExtensionContext extensionContext, String clusterName, String userName, SecurityProtocol securityProtocol, String topicName) { + final TestStorage testStorage = storageMap.get(extensionContext); final String namespaceName = StUtils.getNamespaceBasedOnRbac(Environment.TEST_SUITE_NAMESPACE, extensionContext); final String connectorPodName = kubeClient(namespaceName).listPodsByPrefixInName(namespaceName, clusterName + "-connect").get(0).getMetadata().getName(); @@ -987,13 +988,13 @@ void testConnectAuthorizationWithWeirdUserName(ExtensionContext extensionContext .withNamespaceName(namespaceName) .withClusterName(clusterName) .withKafkaUsername(userName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withSecurityProtocol(securityProtocol) .withTopicName(topicName) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) .build(); - assertThat(externalKafkaClient.sendMessagesTls(), is(MESSAGE_COUNT)); + assertThat(externalKafkaClient.sendMessagesTls(), is(testStorage.getMessageCount())); KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(namespaceName, connectorPodName, TestConstants.DEFAULT_SINK_FILE_PATH, "\"Hello-world - 99\""); } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java index ae2b9e592d6..8d7c6e8a280 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java @@ -208,7 +208,7 @@ void testCruiseControlWithRebalanceResourceAndRefreshAnnotation(ExtensionContext @IsolatedTest void testCruiseControlChangesFromRebalancingtoProposalReadyWhenSpecUpdated(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE); + final TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 1).build()); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/kafka/ConfigProviderST.java b/systemtest/src/test/java/io/strimzi/systemtest/kafka/ConfigProviderST.java index d1293148b94..49b120b6a1c 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/kafka/ConfigProviderST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/kafka/ConfigProviderST.java @@ -152,7 +152,7 @@ void testConnectWithConnectorUsingConfigAndEnvProvider(ExtensionContext extensio .withProducerName(producerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(clusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withDelayMs(0) .withNamespaceName(namespaceName) .build(); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfST.java b/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfST.java index 5f2aa812135..28853cfd005 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfST.java @@ -298,7 +298,7 @@ void testUpdateToExternalListenerCausesRollingRestartUsingExternalClients(Extens .withTopicName(topicName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) .withClusterName(clusterName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withKafkaUsername(userName) .withSecurityProtocol(SecurityProtocol.SSL) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) @@ -308,7 +308,7 @@ void testUpdateToExternalListenerCausesRollingRestartUsingExternalClients(Extens .withTopicName(topicName) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) .withClusterName(clusterName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withSecurityProtocol(SecurityProtocol.PLAINTEXT) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) .build(); @@ -348,7 +348,7 @@ void testUpdateToExternalListenerCausesRollingRestartUsingExternalClients(Extens kafkaPods = RollingUpdateUtils.waitTillComponentHasRolled(Environment.TEST_SUITE_NAMESPACE, kafkaSelector, KAFKA_REPLICAS, kafkaPods); externalKafkaClientTls.verifyProducedAndConsumedMessages( - externalKafkaClientTls.sendMessagesTls() + MESSAGE_COUNT, + externalKafkaClientTls.sendMessagesTls() + testStorage.getMessageCount(), externalKafkaClientTls.receiveMessagesTls() ); @@ -379,7 +379,7 @@ void testUpdateToExternalListenerCausesRollingRestartUsingExternalClients(Extens }); externalKafkaClientPlain.verifyProducedAndConsumedMessages( - externalKafkaClientPlain.sendMessagesPlain() + MESSAGE_COUNT, + externalKafkaClientPlain.sendMessagesPlain() + testStorage.getMessageCount(), externalKafkaClientPlain.receiveMessagesPlain() ); } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfSharedST.java b/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfSharedST.java index 35198a71731..23b56c5e86c 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfSharedST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/kafka/dynamicconfiguration/DynamicConfSharedST.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import static io.strimzi.systemtest.TestConstants.DYNAMIC_CONFIGURATION; @@ -49,6 +50,7 @@ public class DynamicConfSharedST extends AbstractST { private final String dynamicConfigurationSharedClusterName = "dynamic-config-shared"; private String scraperPodName; + private static Random rng = new Random(); @TestFactory Iterator testDynConfiguration() { diff --git a/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java b/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java index 32a3434b619..3116f30ecd5 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/ListenersST.java @@ -431,7 +431,7 @@ void testNodePort(ExtensionContext extensionContext) { .withTopicName(topicName) .withNamespaceName(namespaceName) .withClusterName(clusterName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) .build(); @@ -521,7 +521,7 @@ void testOverrideNodePortConfiguration(ExtensionContext extensionContext) { .withTopicName(topicName) .withNamespaceName(namespaceName) .withClusterName(clusterName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) .build(); @@ -563,7 +563,7 @@ void testNodePortTls(ExtensionContext extensionContext) { .withTopicName(topicName) .withNamespaceName(namespaceName) .withClusterName(clusterName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withKafkaUsername(userName) .withSecurityProtocol(SecurityProtocol.SSL) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) @@ -607,7 +607,7 @@ void testLoadBalancer(ExtensionContext extensionContext) { .withTopicName(topicName) .withNamespaceName(namespaceName) .withClusterName(clusterName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) .build(); @@ -655,7 +655,7 @@ void testLoadBalancerTls(ExtensionContext extensionContext) { .withTopicName(topicName) .withNamespaceName(namespaceName) .withClusterName(clusterName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withKafkaUsername(userName) .withSecurityProtocol(SecurityProtocol.SSL) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) @@ -693,17 +693,17 @@ void testClusterIp(ExtensionContext extensionContext) { .withNamespaceName(namespaceName) .withTopicName(testStorage.getTopicName()) .withBootstrapAddress(KafkaUtils.bootstrapAddressFromStatus(clusterName, namespaceName, TestConstants.CLUSTER_IP_LISTENER_DEFAULT_NAME)) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withUsername(testStorage.getUsername()) .withProducerName(testStorage.getProducerName()) .withConsumerName(testStorage.getConsumerName()) .build(); resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerStrimzi()); - ClientUtils.waitForClientSuccess(testStorage.getProducerName(), namespaceName, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getProducerName(), namespaceName, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, kafkaClients.consumerStrimzi()); - ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), namespaceName, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), namespaceName, testStorage.getMessageCount()); } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/MultipleListenersST.java b/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/MultipleListenersST.java index b5d2ba700b4..4272debb2b6 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/MultipleListenersST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/kafka/listeners/MultipleListenersST.java @@ -164,6 +164,7 @@ void testCombinationOfEveryKindOfListener(ExtensionContext extensionContext) thr } private void runListenersTest(ExtensionContext extensionContext, List listeners, String clusterName) { + final TestStorage testStorage = storageMap.get(extensionContext); LOGGER.info("These are listeners to be verified: {}", listeners); // exercise phase @@ -200,7 +201,7 @@ private void runListenersTest(ExtensionContext extensionContext, List loggers = new HashMap<>(); loggers.put("connect.root.logger.level", "OFF"); @@ -1350,7 +1350,7 @@ void testNotExistingCMSetsDefaultLogging(ExtensionContext extensionContext) { @ParallelNamespaceTest void testLoggingHierarchy(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 3).build()); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java b/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java index b8ad70224ee..0dbfa7b2017 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java @@ -42,6 +42,7 @@ import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.resources.crd.KafkaResource; import io.strimzi.systemtest.resources.kubernetes.NetworkPolicyResource; +import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates; import io.strimzi.systemtest.templates.crd.KafkaConnectTemplates; import io.strimzi.systemtest.templates.crd.KafkaConnectorTemplates; @@ -295,6 +296,7 @@ void testKafkaConnectAndConnectorMetrics(ExtensionContext extensionContext) { @IsolatedTest @Tag(INTERNAL_CLIENTS_USED) void testKafkaExporterMetrics(ExtensionContext extensionContext) { + final TestStorage testStorage = storageMap.get(extensionContext); final String producerName = "producer-" + new Random().nextInt(Integer.MAX_VALUE); final String consumerName = "consumer-" + new Random().nextInt(Integer.MAX_VALUE); final String kafkaStrimziPodSetName = KafkaResources.kafkaComponentName(kafkaClusterFirstName); @@ -310,7 +312,7 @@ void testKafkaExporterMetrics(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerStrimzi(), kafkaClients.consumerStrimzi()); - ClientUtils.waitForClientsSuccess(producerName, consumerName, namespaceFirst, MESSAGE_COUNT, false); + ClientUtils.waitForClientsSuccess(producerName, consumerName, namespaceFirst, testStorage.getMessageCount(), false); assertMetricValueNotNull(kafkaExporterCollector, "kafka_consumergroup_current_offset\\{.*\\}"); @@ -541,6 +543,7 @@ void testMirrorMaker2Metrics(ExtensionContext extensionContext) { @ParallelTest @Tag(BRIDGE) void testKafkaBridgeMetrics(ExtensionContext extensionContext) { + final TestStorage testStorage = storageMap.get(extensionContext); String producerName = "bridge-producer"; String consumerName = "bridge-consumer"; @@ -563,7 +566,7 @@ void testKafkaBridgeMetrics(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaBridgeResources.serviceName(bridgeClusterName)) .withTopicName(bridgeTopicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withDelayMs(200) .withPollInterval(200) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/mirrormaker/MirrorMaker2ST.java b/systemtest/src/test/java/io/strimzi/systemtest/mirrormaker/MirrorMaker2ST.java index 538279d8093..f58ac56c707 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/mirrormaker/MirrorMaker2ST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/mirrormaker/MirrorMaker2ST.java @@ -905,7 +905,7 @@ void testKafkaMirrorMaker2ConnectorsState(ExtensionContext extensionContext) { @KRaftWithoutUTONotSupported @SuppressWarnings({"checkstyle:MethodLength"}) void testKMM2RollAfterSecretsCertsUpdateScramSha(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); final String customSecretSource = "custom-secret-source"; final String customSecretTarget = "custom-secret-target"; @@ -1001,7 +1001,7 @@ void testKMM2RollAfterSecretsCertsUpdateScramSha(ExtensionContext extensionConte .withUsername(testStorage.getSourceUsername()) .withNamespaceName(testStorage.getNamespaceName()) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .build(); resourceManager.createResourceWithWait(extensionContext, clients.producerScramShaTlsStrimzi(testStorage.getSourceClusterName()), clients.consumerScramShaTlsStrimzi(testStorage.getSourceClusterName())); @@ -1060,7 +1060,7 @@ void testKMM2RollAfterSecretsCertsUpdateScramSha(ExtensionContext extensionConte @ParallelNamespaceTest @SuppressWarnings({"checkstyle:MethodLength"}) void testKMM2RollAfterSecretsCertsUpdateTLS(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // Deploy source kafka with tls listener and mutual tls auth resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaPersistent(testStorage.getSourceClusterName(), 1) @@ -1155,7 +1155,7 @@ void testKMM2RollAfterSecretsCertsUpdateTLS(ExtensionContext extensionContext) { .withUsername(testStorage.getSourceUsername()) .withNamespaceName(testStorage.getNamespaceName()) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .build(); resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getSourceClusterName()), clients.consumerTlsStrimzi(testStorage.getSourceClusterName())); @@ -1167,7 +1167,7 @@ void testKMM2RollAfterSecretsCertsUpdateTLS(ExtensionContext extensionContext) { .withUsername(testStorage.getTargetUsername()) .build(); - LOGGER.info("Consumer in target cluster and Topic should receive {} messages", MESSAGE_COUNT); + LOGGER.info("Consumer in target cluster and Topic should receive {} messages", testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, clients.consumerTlsStrimzi(testStorage.getTargetClusterName())); ClientUtils.waitForConsumerClientSuccess(testStorage); @@ -1209,7 +1209,7 @@ void testKMM2RollAfterSecretsCertsUpdateTLS(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getSourceClusterName())); ClientUtils.waitForProducerClientSuccess(testStorage); - LOGGER.info("Consumer in target cluster and Topic should receive {} messages", MESSAGE_COUNT); + LOGGER.info("Consumer in target cluster and Topic should receive {} messages", testStorage.getMessageCount()); clients = new KafkaClientsBuilder(clients) .withTopicName(testStorage.getMirroredSourceTopicName()) @@ -1254,7 +1254,7 @@ void testKMM2RollAfterSecretsCertsUpdateTLS(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getSourceClusterName())); ClientUtils.waitForProducerClientSuccess(testStorage); - LOGGER.info("Consumer in target cluster and Topic should receive {} messages", MESSAGE_COUNT); + LOGGER.info("Consumer in target cluster and Topic should receive {} messages", testStorage.getMessageCount()); clients = new KafkaClientsBuilder(clients) .withTopicName(testStorage.getMirroredSourceTopicName()) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/CustomResourceStatusST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/CustomResourceStatusST.java index 9abe9b3f5b8..a471c24b168 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/CustomResourceStatusST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/CustomResourceStatusST.java @@ -109,14 +109,15 @@ class CustomResourceStatusST extends AbstractST { @Tag(NODEPORT_SUPPORTED) @Tag(EXTERNAL_CLIENTS_USED) void testKafkaStatus(ExtensionContext extensionContext) { + final TestStorage testStorage = storageMap.get(extensionContext); LOGGER.info("Checking status of deployed Kafka cluster"); KafkaUtils.waitForKafkaReady(Environment.TEST_SUITE_NAMESPACE, CUSTOM_RESOURCE_STATUS_CLUSTER_NAME); ExternalKafkaClient externalKafkaClient = new ExternalKafkaClient.Builder() - .withTopicName(TOPIC_NAME) + .withTopicName(testStorage.getTopicName()) .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) .withClusterName(CUSTOM_RESOURCE_STATUS_CLUSTER_NAME) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) .build(); @@ -416,6 +417,7 @@ void testKafkaMirrorMaker2WrongBootstrap(ExtensionContext extensionContext) { @BeforeAll void setup(ExtensionContext extensionContext) { + final TestStorage testStorage = new TestStorage(extensionContext); this.clusterOperator = this.clusterOperator.defaultInstallation(extensionContext) .withOperationTimeout(TestConstants.CO_OPERATION_TIMEOUT_SHORT) .createInstallation() @@ -458,7 +460,7 @@ void setup(ExtensionContext extensionContext) { .endSpec(); resourceManager.createResourceWithWait(extensionContext, kafkaBuilder.build()); - resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(CUSTOM_RESOURCE_STATUS_CLUSTER_NAME, TOPIC_NAME, Environment.TEST_SUITE_NAMESPACE).build()); + resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(CUSTOM_RESOURCE_STATUS_CLUSTER_NAME, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); } void assertKafkaStatus(long expectedObservedGeneration, String internalAddress) { diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java index 5214143e2b5..ac9ced89d3a 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java @@ -127,7 +127,7 @@ void testMultipleCOsInDifferentNamespaces(ExtensionContext extensionContext) { // Strimzi is deployed with cluster-wide access in this class STRIMZI_RBAC_SCOPE=NAMESPACE won't work assumeFalse(Environment.isNamespaceRbacScope()); - TestStorage testStorage = new TestStorage(extensionContext, DEFAULT_NAMESPACE); + final TestStorage testStorage = new TestStorage(extensionContext, DEFAULT_NAMESPACE); String firstCOScraperName = FIRST_NAMESPACE + "-" + TestConstants.SCRAPER_NAME; String secondCOScraperName = SECOND_NAMESPACE + "-" + TestConstants.SCRAPER_NAME; @@ -196,11 +196,11 @@ void testMultipleCOsInDifferentNamespaces(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(testStorage.getClusterName())) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .build(); resourceManager.createResourceWithWait(extensionContext, basicClients.producerStrimzi()); - ClientUtils.waitForClientSuccess(testStorage.getProducerName(), testStorage.getNamespaceName(), MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getProducerName(), testStorage.getNamespaceName(), testStorage.getMessageCount()); KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), kafkaConnectPodName, TestConstants.DEFAULT_SINK_FILE_PATH, "Hello-world - 99"); @@ -255,7 +255,7 @@ void testMultipleCOsInDifferentNamespaces(ExtensionContext extensionContext) { @KRaftNotSupported("The scaling of the Kafka Pods is not working properly at the moment") void testKafkaCCAndRebalanceWithMultipleCOs(ExtensionContext extensionContext) { assumeFalse(Environment.isNamespaceRbacScope()); - TestStorage testStorage = new TestStorage(extensionContext, DEFAULT_NAMESPACE); + final TestStorage testStorage = new TestStorage(extensionContext, DEFAULT_NAMESPACE); LabelSelector kafkaSelector = KafkaResource.getLabelSelector(testStorage.getClusterName(), KafkaResources.kafkaComponentName(testStorage.getClusterName())); int scaleTo = 4; diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/RecoveryST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/RecoveryST.java index 2c95a6a110d..5c7f86a25e5 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/RecoveryST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/RecoveryST.java @@ -88,7 +88,7 @@ void testRecoveryFromZookeeperStrimziPodSetDeletion() { @IsolatedTest("We need for each test case its own Cluster Operator") void testRecoveryFromKafkaServiceDeletion(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // kafka cluster already deployed LOGGER.info("Running deleteKafkaService with cluster {}", sharedClusterName); @@ -106,7 +106,7 @@ void testRecoveryFromKafkaServiceDeletion(ExtensionContext extensionContext) { @IsolatedTest("We need for each test case its own Cluster Operator") @KRaftNotSupported("Zookeeper is not supported by KRaft mode and is used in this test class") void testRecoveryFromZookeeperServiceDeletion(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // kafka cluster already deployed LOGGER.info("Running deleteKafkaService with cluster {}", sharedClusterName); @@ -124,7 +124,7 @@ void testRecoveryFromZookeeperServiceDeletion(ExtensionContext extensionContext) @IsolatedTest("We need for each test case its own Cluster Operator") void testRecoveryFromKafkaHeadlessServiceDeletion(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // kafka cluster already deployed LOGGER.info("Running deleteKafkaHeadlessService with cluster {}", sharedClusterName); @@ -143,7 +143,7 @@ void testRecoveryFromKafkaHeadlessServiceDeletion(ExtensionContext extensionCont @IsolatedTest("We need for each test case its own Cluster Operator") @KRaftNotSupported("Zookeeper is not supported by KRaft mode and is used in this test class") void testRecoveryFromZookeeperHeadlessServiceDeletion(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // kafka cluster already deployed LOGGER.info("Running deleteKafkaHeadlessService with cluster {}", sharedClusterName); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/AlternativeReconcileTriggersST.java b/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/AlternativeReconcileTriggersST.java index 873f5bc7495..37374926384 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/AlternativeReconcileTriggersST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/AlternativeReconcileTriggersST.java @@ -125,7 +125,7 @@ void testManualTriggeringRollingUpdate(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName())) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withNamespaceName(testStorage.getNamespaceName()) .withUsername(testStorage.getUsername()) .build(); @@ -406,7 +406,7 @@ void testAddingAndRemovingJbodVolumes(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName())) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withNamespaceName(testStorage.getNamespaceName()) .withUsername(testStorage.getUsername()) .build(); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/SecurityST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/SecurityST.java index 65c1a538d6e..abd7b6c201d 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/SecurityST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/SecurityST.java @@ -1042,17 +1042,17 @@ void testAclWithSuperUser(ExtensionContext extensionContext) { .withNamespaceName(namespaceName) .withClusterName(clusterName) .withKafkaUsername(userName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withSecurityProtocol(SecurityProtocol.SSL) .withListenerName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) .build(); - assertThat(externalKafkaClient.sendMessagesTls(), is(MESSAGE_COUNT)); + assertThat(externalKafkaClient.sendMessagesTls(), is(testStorage.getMessageCount())); LOGGER.info("Checking Kafka super user: {}/{} that is able to read messages to Topic: {}/{} regardless that " + "we configured Acls with only write operation", namespaceName, userName, namespaceName, topicName); - assertThat(externalKafkaClient.receiveMessagesTls(), is(MESSAGE_COUNT)); + assertThat(externalKafkaClient.receiveMessagesTls(), is(testStorage.getMessageCount())); String nonSuperuserName = userName + "-non-super-user"; @@ -1075,7 +1075,7 @@ void testAclWithSuperUser(ExtensionContext extensionContext) { .withKafkaUsername(nonSuperuserName) .build(); - assertThat(externalKafkaClient.sendMessagesTls(), is(MESSAGE_COUNT)); + assertThat(externalKafkaClient.sendMessagesTls(), is(testStorage.getMessageCount())); LOGGER.info("Checking Kafka super user: {}/{} that is not able to read messages to Topic: {}/{} because of defined" + " ACLs on only write operation", namespaceName, nonSuperuserName, namespaceName, topicName); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthAuthorizationST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthAuthorizationST.java index 5d212a3aeaf..25353613e95 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthAuthorizationST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthAuthorizationST.java @@ -105,7 +105,7 @@ void smokeTestForClients(ExtensionContext extensionContext) { .withConsumerName(teamAConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup(consumerGroup) .withOauthClientId(TEAM_A_CLIENT) .withOauthClientSecret(TEAM_A_CLIENT_SECRET) @@ -113,9 +113,9 @@ void smokeTestForClients(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAConsumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAConsumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As a member of team A, I should be able to write to topics that starts with x- on any cluster and " + @@ -139,14 +139,14 @@ void testTeamAWriteToTopic(ExtensionContext extensionContext) { .withConsumerName(teamAConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup(consumerGroup) .withOauthClientId(TEAM_A_CLIENT) .withOauthClientSecret(TEAM_A_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); - LOGGER.info("Sending {} messages to Broker with Topic name {}", MESSAGE_COUNT, topicName); + LOGGER.info("Sending {} messages to Broker with Topic name {}", testStorage.getMessageCount(), topicName); LOGGER.info("Producer will not produce messages because authorization Topic will failed. Team A can write only to Topic starting with 'x-'"); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); @@ -154,7 +154,7 @@ void testTeamAWriteToTopic(ExtensionContext extensionContext) { JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, teamAProducerName); String topicXName = TOPIC_X + "-" + clusterName; - LOGGER.info("Sending {} messages to Broker with Topic name {}", MESSAGE_COUNT, topicXName); + LOGGER.info("Sending {} messages to Broker with Topic name {}", testStorage.getMessageCount(), topicXName); teamAOauthClientJob = new KafkaOauthClientsBuilder(teamAOauthClientJob) .withConsumerGroup(consumerGroup) @@ -168,11 +168,11 @@ void testTeamAWriteToTopic(ExtensionContext extensionContext) { // Team A can not create topic starting with 'x-' only write to existing on resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, topicXName, Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); String topicAName = TOPIC_A + "-" + clusterName; - LOGGER.info("Sending {} messages to Broker with Topic name {}", MESSAGE_COUNT, topicAName); + LOGGER.info("Sending {} messages to Broker with Topic name {}", testStorage.getMessageCount(), topicAName); teamAOauthClientJob = new KafkaOauthClientsBuilder(teamAOauthClientJob) .withConsumerGroup(consumerGroup) @@ -180,7 +180,7 @@ void testTeamAWriteToTopic(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As a member of team A, I should be able only read from consumer that starts with a_") @@ -202,16 +202,16 @@ void testTeamAReadFromTopic(ExtensionContext extensionContext) { .withConsumerName(teamAConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicAName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup(consumerGroup) .withOauthClientId(TEAM_A_CLIENT) .withOauthClientSecret(TEAM_A_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); - LOGGER.info("Sending {} messages to Broker with Topic name {}", MESSAGE_COUNT, topicAName); + LOGGER.info("Sending {} messages to Broker with Topic name {}", testStorage.getMessageCount(), topicAName); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); // team A client shouldn't be able to consume messages with wrong consumer group @@ -232,7 +232,7 @@ void testTeamAReadFromTopic(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As a member of team B, I should be able to write and read from topics that starts with b-") @@ -254,20 +254,20 @@ void testTeamBWriteToTopic(ExtensionContext extensionContext) { .withConsumerName(teamBConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup(consumerGroup) .withOauthClientId(TEAM_B_CLIENT) .withOauthClientSecret(TEAM_B_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); - LOGGER.info("Sending {} messages to Broker with Topic name {}", MESSAGE_COUNT, TOPIC_NAME); + LOGGER.info("Sending {} messages to Broker with Topic name {}", testStorage.getMessageCount(), testStorage.getTopicName()); // Producer will not produce messages because authorization topic will failed. Team A can write only to topic starting with 'x-' resourceManager.createResourceWithWait(extensionContext, teamBOauthClientJob.producerStrimziOauthTls(oauthClusterName)); JobUtils.waitForJobFailure(teamBProducerName, Environment.TEST_SUITE_NAMESPACE, 30_000); JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, teamBProducerName); - LOGGER.info("Sending {} messages to Broker with Topic name {}", MESSAGE_COUNT, TOPIC_B); + LOGGER.info("Sending {} messages to Broker with Topic name {}", testStorage.getMessageCount(), TOPIC_B); teamBOauthClientJob = new KafkaOauthClientsBuilder(teamBOauthClientJob) .withConsumerGroup("x-consumer_group_b-" + clusterName) @@ -276,7 +276,7 @@ void testTeamBWriteToTopic(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, teamBOauthClientJob.producerStrimziOauthTls(oauthClusterName)); resourceManager.createResourceWithWait(extensionContext, teamBOauthClientJob.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientsSuccess(teamBProducerName, teamBConsumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientsSuccess(teamBProducerName, teamBConsumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As a member of team A, I can write to topics starting with 'x-' and " + @@ -303,7 +303,7 @@ void testTeamAWriteToTopicStartingWithXAndTeamBReadFromTopicStartingWithX(Extens .withConsumerName(teamAConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicXName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup(consumerGroup) .withOauthClientId(TEAM_A_CLIENT) .withOauthClientSecret(TEAM_A_CLIENT_SECRET) @@ -316,7 +316,7 @@ void testTeamAWriteToTopicStartingWithXAndTeamBReadFromTopicStartingWithX(Extens .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); KafkaOauthClients teamBOauthClientJob = new KafkaOauthClientsBuilder() .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) @@ -324,7 +324,7 @@ void testTeamAWriteToTopicStartingWithXAndTeamBReadFromTopicStartingWithX(Extens .withConsumerName(teamBConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicXName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup("x-consumer_group_b-" + clusterName) .withOauthClientId(TEAM_B_CLIENT) .withOauthClientSecret(TEAM_B_CLIENT_SECRET) @@ -332,7 +332,7 @@ void testTeamAWriteToTopicStartingWithXAndTeamBReadFromTopicStartingWithX(Extens .build(); resourceManager.createResourceWithWait(extensionContext, teamBOauthClientJob.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamBConsumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamBConsumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As a superuser of team A and team B, i am able to break defined authorization rules") @@ -363,7 +363,7 @@ void testSuperUserWithOauthAuthorization(ExtensionContext extensionContext) { .withConsumerName(teamBConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicXName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup("x-consumer_group_b-" + clusterName) .withOauthClientId(TEAM_B_CLIENT) .withOauthClientSecret(TEAM_B_CLIENT_SECRET) @@ -384,7 +384,7 @@ void testSuperUserWithOauthAuthorization(ExtensionContext extensionContext) { .withConsumerName(teamAConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicXName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup("x-consumer_group_b1-" + clusterName) .withOauthClientId(TEAM_A_CLIENT) .withOauthClientSecret(TEAM_A_CLIENT_SECRET) @@ -412,7 +412,7 @@ void testSuperUserWithOauthAuthorization(ExtensionContext extensionContext) { LOGGER.info("Verifying that team B is able to write to Topic starting with 'x-' and break authorization rule"); resourceManager.createResourceWithWait(extensionContext, teamBOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamBProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamBProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); LOGGER.info("Verifying that team A is able to write to Topic starting with 'x-' and break authorization rule"); @@ -422,7 +422,7 @@ void testSuperUserWithOauthAuthorization(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAConsumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAConsumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } /** @@ -460,7 +460,7 @@ void testSessionReAuthentication(ExtensionContext extensionContext) { .withConsumerName(teamAConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicXName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup("a-consumer_group") .withClientUserName(TEAM_A_CLIENT) .withOauthClientId(TEAM_A_CLIENT) @@ -469,7 +469,7 @@ void testSessionReAuthentication(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); LOGGER.info("Adding the maxSecondsWithoutReauthentication to Kafka listener with OAuth authentication"); KafkaResource.replaceKafkaResourceInSpecificNamespace(oauthClusterName, kafka -> { @@ -545,7 +545,7 @@ void testSessionReAuthentication(ExtensionContext extensionContext) { LOGGER.info("Changing the Dev Team A policy for Topics starting with x- and checking that Job will not be successful"); KeycloakUtils.updatePolicyOfRealmClient(Environment.TEST_SUITE_NAMESPACE, baseUri, token, newDevAPolicy, TEST_REALM, kafkaClientId); - assertThrows(WaitException.class, () -> ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT)); + assertThrows(WaitException.class, () -> ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount())); JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, teamAProducerName); @@ -556,7 +556,7 @@ void testSessionReAuthentication(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); LOGGER.info("Changing back to the original settings and checking, if the producer will be successful"); @@ -570,7 +570,7 @@ void testSessionReAuthentication(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); LOGGER.info("Changing configuration of Kafka back to it's original form"); KafkaResource.replaceKafkaResourceInSpecificNamespace(oauthClusterName, kafka -> { @@ -597,7 +597,7 @@ void testClusterVerification(ExtensionContext extensionContext) { @ParallelNamespaceTest @Order(10) void testKeycloakAuthorizerToDelegateToSimpleAuthorizer(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); if (!Environment.isNamespaceRbacScope()) { // we have to create keycloak, team-a-client and team-b-client secret from `co-namespace` to the new namespace @@ -649,7 +649,7 @@ void testKeycloakAuthorizerToDelegateToSimpleAuthorizer(ExtensionContext extensi .withConsumerName(teamAConsumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName())) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withConsumerGroup(consumerGroup) .withOauthClientId(TEAM_A_CLIENT) .withOauthClientSecret(TEAM_A_CLIENT_SECRET) @@ -657,9 +657,9 @@ void testKeycloakAuthorizerToDelegateToSimpleAuthorizer(ExtensionContext extensi .build(); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.producerStrimziOauthTls(testStorage.getClusterName())); - ClientUtils.waitForClientSuccess(teamAProducerName, testStorage.getNamespaceName(), MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAProducerName, testStorage.getNamespaceName(), testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, teamAOauthClientJob.consumerStrimziOauthTls(testStorage.getClusterName())); - ClientUtils.waitForClientSuccess(teamAConsumerName, testStorage.getNamespaceName(), MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(teamAConsumerName, testStorage.getNamespaceName(), testStorage.getMessageCount()); } @BeforeAll diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPasswordGrantsST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPasswordGrantsST.java index 1e9a6660d46..140ed8291f6 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPasswordGrantsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPasswordGrantsST.java @@ -82,7 +82,7 @@ void testPasswordGrantsKafkaMirrorMaker(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -90,10 +90,10 @@ void testPasswordGrantsKafkaMirrorMaker(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getTargetClusterName(), 1, 1) .editMetadata() @@ -185,7 +185,7 @@ void testPasswordGrantsKafkaMirrorMaker(ExtensionContext extensionContext) { .withConsumerName(OAUTH_CONSUMER_NAME) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(testStorage.getTargetClusterName())) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -194,7 +194,7 @@ void testPasswordGrantsKafkaMirrorMaker(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaOauthClientJob.consumerStrimziOauthPlain()); try { - ClientUtils.waitForClientSuccess(OAUTH_CONSUMER_NAME, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(OAUTH_CONSUMER_NAME, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); return true; } catch (WaitException e) { e.printStackTrace(); @@ -214,7 +214,7 @@ void testPasswordGrantsKafkaMirrorMaker2(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -222,10 +222,10 @@ void testPasswordGrantsKafkaMirrorMaker2(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); String kafkaSourceClusterName = oauthClusterName; String kafkaTargetClusterName = testStorage.getTargetClusterName(); @@ -326,7 +326,7 @@ void testPasswordGrantsKafkaMirrorMaker2(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(kafkaTargetClusterName)) .withTopicName(kafkaSourceClusterName + "." + testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -335,7 +335,7 @@ void testPasswordGrantsKafkaMirrorMaker2(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaOauthClientJob.consumerStrimziOauthPlain()); try { - ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); return true; } catch (WaitException e) { e.printStackTrace(); @@ -356,7 +356,7 @@ void testPasswordGrantsKafkaConnect(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -364,10 +364,10 @@ void testPasswordGrantsKafkaConnect(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); KafkaConnect connect = KafkaConnectTemplates.kafkaConnectWithFilePlugin(testStorage.getClusterName(), Environment.TEST_SUITE_NAMESPACE, oauthClusterName, 1) .editMetadata() @@ -431,17 +431,17 @@ void testPasswordGrantsKafkaBridge(ExtensionContext extensionContext) { .withConsumerName(testStorage.getConsumerName()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getProducerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(testStorage.getConsumerName(), Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, KafkaBridgeTemplates.kafkaBridge(oauthClusterName, KafkaResources.plainBootstrapAddress(oauthClusterName), 1) .editMetadata() @@ -470,7 +470,7 @@ void testPasswordGrantsKafkaBridge(ExtensionContext extensionContext) { .withProducerName(producerName) .withBootstrapAddress(KafkaBridgeResources.serviceName(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withPort(HTTP_BRIDGE_DEFAULT_PORT) .withDelayMs(1000) .withPollInterval(1000) @@ -478,7 +478,7 @@ void testPasswordGrantsKafkaBridge(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJob.producerStrimziBridge()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @BeforeAll diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPlainST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPlainST.java index 62826e4f3b4..a77562e6e22 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPlainST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthPlainST.java @@ -126,17 +126,17 @@ void testProducerConsumerWithOauthMetrics(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); assertOauthMetricsForComponent( metricsCollector.toBuilder() @@ -160,7 +160,7 @@ void testSaslPlainProducerConsumer(ExtensionContext extensionContext) { .withConsumerName(audienceConsumerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -172,7 +172,7 @@ void testSaslPlainProducerConsumer(ExtensionContext extensionContext) { .withProducerName(audienceProducerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -180,10 +180,10 @@ void testSaslPlainProducerConsumer(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, plainSaslOauthProducerClientsJob.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(audienceProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(audienceProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, plainSaslOauthConsumerClientsJob.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(audienceConsumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(audienceConsumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @ParallelTest @@ -201,7 +201,7 @@ void testProducerConsumerAudienceTokenChecks(ExtensionContext extensionContext) .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.bootstrapServiceName(oauthClusterName) + ":" + audienceListenerPort) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -209,9 +209,9 @@ void testProducerConsumerAudienceTokenChecks(ExtensionContext extensionContext) LOGGER.info("Use clients without access token containing audience token"); resourceManager.createResourceWithWait(extensionContext, oauthInternalClientJob.producerStrimziOauthPlain()); - assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT)); + assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount())); resourceManager.createResourceWithWait(extensionContext, oauthInternalClientJob.consumerStrimziOauthPlain()); - assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT)); + assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount())); JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, producerName); JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, consumerName); @@ -224,17 +224,17 @@ void testProducerConsumerAudienceTokenChecks(ExtensionContext extensionContext) .withConsumerName(audienceConsumerName) .withBootstrapAddress(KafkaResources.bootstrapServiceName(oauthClusterName) + ":" + customClaimListenerPort) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthAudienceInternalClientJob.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(audienceProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(audienceProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthAudienceInternalClientJob.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(audienceConsumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(audienceConsumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @ParallelTest @@ -253,7 +253,7 @@ void testAccessTokenClaimCheck(ExtensionContext extensionContext) { .withConsumerName(audienceConsumerName) .withBootstrapAddress(KafkaResources.bootstrapServiceName(oauthClusterName) + ":" + customClaimListenerPort) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthProducerClientId(OAUTH_CLIENT_AUDIENCE_PRODUCER) .withOauthConsumerClientId(OAUTH_CLIENT_AUDIENCE_CONSUMER) .withOauthClientSecret(OAUTH_CLIENT_AUDIENCE_SECRET) @@ -261,9 +261,9 @@ void testAccessTokenClaimCheck(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, oauthAudienceInternalClientJob.producerStrimziOauthPlain()); - assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(audienceProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT)); + assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(audienceProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount())); resourceManager.createResourceWithWait(extensionContext, oauthAudienceInternalClientJob.consumerStrimziOauthPlain()); - assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(audienceConsumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT)); + assertDoesNotThrow(() -> ClientUtils.waitForClientTimeout(audienceConsumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount())); JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, audienceProducerName); JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, audienceConsumerName); @@ -276,16 +276,16 @@ void testAccessTokenClaimCheck(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.bootstrapServiceName(oauthClusterName) + ":" + customClaimListenerPort) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthInternalClientJob.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthInternalClientJob.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As an OAuth KafkaConnect, I should be able to sink messages from kafka Broker Topic.") @@ -304,7 +304,7 @@ void testProducerConsumerConnectWithOauthMetrics(ExtensionContext extensionConte .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -312,10 +312,10 @@ void testProducerConsumerConnectWithOauthMetrics(ExtensionContext extensionConte resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); KafkaConnect connect = KafkaConnectTemplates.kafkaConnectWithFilePlugin(oauthClusterName, Environment.TEST_SUITE_NAMESPACE, oauthClusterName, 1) .editMetadata() @@ -385,7 +385,7 @@ void testProducerConsumerMirrorMaker(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -393,10 +393,10 @@ void testProducerConsumerMirrorMaker(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getTargetClusterName(), 1, 1) @@ -492,7 +492,7 @@ void testProducerConsumerMirrorMaker(ExtensionContext extensionContext) { .withConsumerName(OAUTH_CONSUMER_NAME) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(testStorage.getTargetClusterName())) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -501,7 +501,7 @@ void testProducerConsumerMirrorMaker(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaOauthClientJob.consumerStrimziOauthPlain()); try { - ClientUtils.waitForClientSuccess(OAUTH_CONSUMER_NAME, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(OAUTH_CONSUMER_NAME, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); return true; } catch (WaitException e) { e.printStackTrace(); @@ -527,7 +527,7 @@ void testProducerConsumerMirrorMaker2WithOauthMetrics(ExtensionContext extension .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -535,10 +535,10 @@ void testProducerConsumerMirrorMaker2WithOauthMetrics(ExtensionContext extension resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); String kafkaSourceClusterName = oauthClusterName; @@ -646,7 +646,7 @@ void testProducerConsumerMirrorMaker2WithOauthMetrics(ExtensionContext extension .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(testStorage.getTargetClusterName())) .withTopicName(kafkaTargetClusterTopicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -655,7 +655,7 @@ void testProducerConsumerMirrorMaker2WithOauthMetrics(ExtensionContext extension resourceManager.createResourceWithWait(extensionContext, kafkaOauthClientJob.consumerStrimziOauthPlain()); try { - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); return true; } catch (WaitException e) { e.printStackTrace(); @@ -685,7 +685,7 @@ void testProducerConsumerBridgeWithOauthMetrics(ExtensionContext extensionContex .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -693,10 +693,10 @@ void testProducerConsumerBridgeWithOauthMetrics(ExtensionContext extensionContex resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthPlain()); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); // needed for a verification of oauth configuration InlineLogging ilDebug = new InlineLogging(); @@ -733,14 +733,14 @@ void testProducerConsumerBridgeWithOauthMetrics(ExtensionContext extensionContex .withProducerName(bridgeProducerName) .withBootstrapAddress(KafkaBridgeResources.serviceName(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withPort(HTTP_BRIDGE_DEFAULT_PORT) .withDelayMs(1000) .withPollInterval(1000) .build(); resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJob.producerStrimziBridge()); - ClientUtils.waitForClientSuccess(bridgeProducerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(bridgeProducerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); assertOauthMetricsForComponent( metricsCollector.toBuilder() @@ -751,7 +751,7 @@ void testProducerConsumerBridgeWithOauthMetrics(ExtensionContext extensionContex @ParallelTest void testSaslPlainAuthenticationKafkaConnectIsAbleToConnectToKafkaOAuth(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); resourceManager.createResourceWithoutWait(extensionContext, KafkaConnectTemplates.kafkaConnect(testStorage.getClusterName(), testStorage.getNamespaceName(), oauthClusterName, 1) .withNewSpec() diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthScopeST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthScopeST.java index aac8ecfd9a9..a180be2279d 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthScopeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthScopeST.java @@ -156,7 +156,7 @@ void testClientScopeKafkaSetCorrectly(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.bootstrapServiceName(oauthClusterName) + ":" + scopeListenerPort) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) // configures SASL/PLAIN to be used .withAdditionalConfig(additionalOauthConfig) .build(); @@ -168,7 +168,7 @@ void testClientScopeKafkaSetCorrectly(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, oauthInternalClientChecksJob.producerStrimzi()); // client should succeeded because we set to `clientScope=test` and also Kafka has `scope=test` - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @IsolatedTest("Modification of shared Kafka cluster") @@ -186,7 +186,7 @@ void testClientScopeKafkaSetIncorrectly(ExtensionContext extensionContext) throw .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.bootstrapServiceName(oauthClusterName) + ":" + scopeListenerPort) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) // configures SASL/PLAIN to be used .withAdditionalConfig(additionalOauthConfig) .build(); @@ -213,7 +213,7 @@ void testClientScopeKafkaSetIncorrectly(ExtensionContext extensionContext) throw // client should fail because the listener requires scope: 'test' in JWT token but was (the listener) temporarily // configured without clientScope resulting in a JWT token without the scope claim when using the clientId and // secret passed via SASL/PLAIN to obtain an access token in client's name. - ClientUtils.waitForClientTimeout(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientTimeout(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, producerName); // rollback previous configuration diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java index 36dd3315c69..e0ad5b3cbfa 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java @@ -91,17 +91,17 @@ void testProducerConsumer(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As an OAuth KafkaConnect, I am able to sink messages from Kafka Broker topic using encrypted communication.") @@ -124,17 +124,17 @@ void testProducerConsumerConnect(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); KafkaConnect connect = KafkaConnectTemplates.kafkaConnectWithFilePlugin(clusterName, Environment.TEST_SUITE_NAMESPACE, oauthClusterName, 1) .editSpec() @@ -199,17 +199,17 @@ void testProducerConsumerBridge(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, KafkaBridgeTemplates.kafkaBridge(oauthClusterName, KafkaResources.tlsBootstrapAddress(oauthClusterName), 1) .editMetadata() @@ -252,7 +252,7 @@ void testProducerConsumerBridge(ExtensionContext extensionContext) { .build(); resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJob.producerStrimziBridge()); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @Description("As a OAuth MirrorMaker, I am able to replicate Topic data using using encrypted communication") @@ -276,17 +276,17 @@ void testMirrorMaker(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.producerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthExampleClients.consumerStrimziOauthTls(oauthClusterName)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); String targetKafkaCluster = oauthClusterName + "-target"; @@ -386,8 +386,8 @@ void testMirrorMaker(ExtensionContext extensionContext) { assertThat(kafkaMirrorMakerLogs, not(containsString("keytool error: java.io.FileNotFoundException: /opt/kafka/consumer-oauth-certs/**/* (No such file or directory)"))); - resourceManager.createResourceWithWait(extensionContext, KafkaUserTemplates.tlsUser(Environment.TEST_SUITE_NAMESPACE, oauthClusterName, USER_NAME).build()); - KafkaUserUtils.waitForKafkaUserCreation(Environment.TEST_SUITE_NAMESPACE, USER_NAME); + resourceManager.createResourceWithWait(extensionContext, KafkaUserTemplates.tlsUser(Environment.TEST_SUITE_NAMESPACE, oauthClusterName, testStorage.getUsername()).build()); + KafkaUserUtils.waitForKafkaUserCreation(Environment.TEST_SUITE_NAMESPACE, testStorage.getUsername()); LOGGER.info("Creating new client with new consumer-group and also to point on {} cluster", targetKafkaCluster); @@ -395,10 +395,10 @@ void testMirrorMaker(ExtensionContext extensionContext) { .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) .withProducerName(producerName) .withConsumerName(consumerName) - .withClientUserName(USER_NAME) + .withClientUserName(testStorage.getUsername()) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(targetKafkaCluster)) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) @@ -406,7 +406,7 @@ void testMirrorMaker(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaOauthClientJob.consumerStrimziOauthTls(targetKafkaCluster)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @ParallelTest @@ -461,17 +461,17 @@ void testIntrospectionEndpoint(ExtensionContext extensionContext) { .withConsumerName(consumerName) .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(introspectionKafka)) .withTopicName(topicName) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withOauthClientId(OAUTH_CLIENT_NAME) .withOauthClientSecret(OAUTH_CLIENT_SECRET) .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) .build(); resourceManager.createResourceWithWait(extensionContext, oauthInternalClientIntrospectionJob.producerStrimziOauthTls(introspectionKafka)); - ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); resourceManager.createResourceWithWait(extensionContext, oauthInternalClientIntrospectionJob.consumerStrimziOauthTls(introspectionKafka)); - ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } @BeforeAll diff --git a/systemtest/src/test/java/io/strimzi/systemtest/specific/DrainCleanerST.java b/systemtest/src/test/java/io/strimzi/systemtest/specific/DrainCleanerST.java index 9b68b358312..4aed55ff888 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/specific/DrainCleanerST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/specific/DrainCleanerST.java @@ -48,7 +48,7 @@ public class DrainCleanerST extends AbstractST { @IsolatedTest @RequiredMinKubeApiVersion(version = 1.17) void testDrainCleanerWithComponents(ExtensionContext extensionContext) { - TestStorage testStorage = new TestStorage(extensionContext, TestConstants.DRAIN_CLEANER_NAMESPACE); + final TestStorage testStorage = new TestStorage(extensionContext, TestConstants.DRAIN_CLEANER_NAMESPACE); final int replicas = 3; diff --git a/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java b/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java index 04359fd60e9..26d069066df 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/specific/RackAwarenessST.java @@ -78,7 +78,7 @@ class RackAwarenessST extends AbstractST { void testKafkaRackAwareness(ExtensionContext extensionContext) { Assumptions.assumeFalse(Environment.isNamespaceRbacScope()); - TestStorage testStorage = storageMap.get(extensionContext); + final TestStorage testStorage = storageMap.get(extensionContext); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 1, 1) .editSpec() @@ -249,7 +249,7 @@ void testConnectRackAwareness(ExtensionContext extensionContext) { void testMirrorMaker2RackAwareness(ExtensionContext extensionContext) { Assumptions.assumeFalse(Environment.isNamespaceRbacScope()); - TestStorage testStorage = storageMap.get(extensionContext); + final TestStorage testStorage = storageMap.get(extensionContext); resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getSourceClusterName(), 1, 1).build()); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/tracing/OpenTelemetryST.java b/systemtest/src/test/java/io/strimzi/systemtest/tracing/OpenTelemetryST.java index 4566451e5d7..ad358aa2813 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/tracing/OpenTelemetryST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/tracing/OpenTelemetryST.java @@ -303,7 +303,8 @@ void testProducerConsumerStreamsConnectService(ExtensionContext extensionContext ClientUtils.waitForClientsSuccess( testStorage.getProducerName(), testStorage.getConsumerName(), - testStorage.getNamespaceName(), MESSAGE_COUNT); + testStorage.getNamespaceName(), + testStorage.getMessageCount()); TracingUtils.verify(testStorage.getNamespaceName(), JAEGER_PRODUCER_SERVICE, testStorage.getScraperPodName(), "To_" + testStorage.getTopicName(), JAEGER_QUERY_SERVICE); TracingUtils.verify(testStorage.getNamespaceName(), JAEGER_CONSUMER_SERVICE, testStorage.getScraperPodName(), "From_" + testStorage.getTopicName(), JAEGER_QUERY_SERVICE); @@ -357,7 +358,7 @@ void testKafkaBridgeService(ExtensionContext extensionContext) { .withNamespaceName(testStorage.getNamespaceName()) .withBootstrapAddress(KafkaBridgeResources.serviceName(testStorage.getClusterName())) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withDelayMs(1000) .withPollInterval(1000) @@ -366,7 +367,7 @@ void testKafkaBridgeService(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJob.producerStrimziBridgeWithTracing()); resourceManager.createResourceWithWait(extensionContext, (testStorage.getTracingClients()).consumerWithTracing()); - ClientUtils.waitForClientSuccess(bridgeProducer, testStorage.getNamespaceName(), MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(bridgeProducer, testStorage.getNamespaceName(), testStorage.getMessageCount()); TracingUtils.verify(testStorage.getNamespaceName(), JAEGER_KAFKA_BRIDGE_SERVICE, testStorage.getScraperPodName(), JAEGER_QUERY_SERVICE); } @@ -416,7 +417,7 @@ void testKafkaBridgeServiceWithHttpTracing(ExtensionContext extensionContext) { .withNamespaceName(testStorage.getNamespaceName()) .withBootstrapAddress(KafkaBridgeResources.serviceName(testStorage.getClusterName())) .withTopicName(testStorage.getTopicName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withPort(TestConstants.HTTP_BRIDGE_DEFAULT_PORT) .withDelayMs(1000) .withPollInterval(1000) @@ -425,7 +426,7 @@ void testKafkaBridgeServiceWithHttpTracing(ExtensionContext extensionContext) { resourceManager.createResourceWithWait(extensionContext, kafkaBridgeClientJob.producerStrimziBridgeWithTracing()); resourceManager.createResourceWithWait(extensionContext, (testStorage.getTracingClients()).consumerWithTracing()); - ClientUtils.waitForClientSuccess(bridgeProducer, testStorage.getNamespaceName(), MESSAGE_COUNT); + ClientUtils.waitForClientSuccess(bridgeProducer, testStorage.getNamespaceName(), testStorage.getMessageCount()); TracingUtils.verify(testStorage.getNamespaceName(), JAEGER_KAFKA_BRIDGE_SERVICE, testStorage.getScraperPodName(), JAEGER_QUERY_SERVICE); TracingUtils.verify(testStorage.getNamespaceName(), bridgeProducer, testStorage.getScraperPodName(), JAEGER_QUERY_SERVICE); @@ -447,7 +448,7 @@ void createTestResources(final ExtensionContext extensionContext) { .withBootstrapAddress(KafkaResources.plainBootstrapAddress(testStorage.getClusterName())) .withTopicName(testStorage.getTopicName()) .withStreamsTopicTargetName(testStorage.getStreamsTopicTargetName()) - .withMessageCount(MESSAGE_COUNT) + .withMessageCount(testStorage.getMessageCount()) .withJaegerServiceProducerName(JAEGER_PRODUCER_SERVICE) .withJaegerServiceConsumerName(JAEGER_CONSUMER_SERVICE) .withJaegerServiceStreamsName(JAEGER_KAFKA_STREAMS_SERVICE) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java index 571fc9d96b3..bc3fdeb85f2 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java @@ -60,7 +60,7 @@ void testDowngradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extensi @SuppressWarnings("MethodLength") private void performDowngrade(BundleVersionModificationData downgradeData, ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); String lowerMetadataVersion = downgradeData.getProcedures().getMetadataVersion(); UpgradeKafkaVersion testUpgradeKafkaVersion = new UpgradeKafkaVersion(downgradeData.getDeployKafkaVersion(), lowerMetadataVersion); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java index 59871949187..5d0bbf9c2ed 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java @@ -65,7 +65,7 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion()); upgradeKafkaVersion.setVersion(null); - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // Setup env setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE); @@ -106,7 +106,7 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO @IsolatedTest void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion()); // Setup env @@ -141,7 +141,7 @@ void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext exten @IsolatedTest void testUpgradeAcrossVersionsWithNoKafkaVersion(ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // Setup env setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, null, TestConstants.CO_NAMESPACE); @@ -183,7 +183,7 @@ void testUpgradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extension } private void performUpgrade(BundleVersionModificationData upgradeData, ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // leave empty, so the original Kafka version from appropriate Strimzi's yaml will be used UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java index 5157fb28f69..55766d0258b 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java @@ -68,7 +68,7 @@ void testDowngradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extensi @SuppressWarnings("MethodLength") private void performDowngrade(BundleVersionModificationData downgradeData, ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); UpgradeKafkaVersion testUpgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(downgradeData.getFromKafkaVersionsUrl(), downgradeData.getDeployKafkaVersion()); // Setup env diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java index cd4b03aa46b..5ed25152eca 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java @@ -69,7 +69,7 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion()); upgradeKafkaVersion.setVersion(null); - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // Setup env setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE); @@ -105,7 +105,7 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO @Test void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion()); // Setup env @@ -132,7 +132,7 @@ void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext exten @Test void testUpgradeAcrossVersionsWithNoKafkaVersion(ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // Setup env setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, null, TestConstants.CO_NAMESPACE); @@ -167,7 +167,7 @@ void testUpgradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extension } private void performUpgrade(BundleVersionModificationData upgradeData, ExtensionContext extensionContext) throws IOException { - TestStorage testStorage = new TestStorage(extensionContext); + final TestStorage testStorage = new TestStorage(extensionContext); // leave empty, so the original Kafka version from appropriate Strimzi's yaml will be used UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/watcher/AbstractNamespaceST.java b/systemtest/src/test/java/io/strimzi/systemtest/watcher/AbstractNamespaceST.java index bdad2144661..e017c9a66ea 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/watcher/AbstractNamespaceST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/watcher/AbstractNamespaceST.java @@ -295,7 +295,7 @@ private void deployKafkaConnectorWithSink(ExtensionContext extensionContext, Str // Deploy Kafka Connector Map connectorConfig = new HashMap<>(); - connectorConfig.put("topics", TOPIC_NAME); + connectorConfig.put("topics", testStorage.getTopicName()); connectorConfig.put("file", TestConstants.DEFAULT_SINK_FILE_PATH); connectorConfig.put("key.converter", "org.apache.kafka.connect.storage.StringConverter"); connectorConfig.put("value.converter", "org.apache.kafka.connect.storage.StringConverter"); @@ -317,7 +317,7 @@ private void deployKafkaConnectorWithSink(ExtensionContext extensionContext, Str KafkaConnectUtils.waitUntilKafkaConnectRestApiIsAvailable(testStorage.getNamespaceName(), kafkaConnectPodName); KafkaClients kafkaClients = new KafkaClientsBuilder() - .withTopicName(TOPIC_NAME) + .withTopicName(testStorage.getTopicName()) .withMessageCount(testStorage.getMessageCount()) .withBootstrapAddress(KafkaResources.plainBootstrapAddress(PRIMARY_KAFKA_NAME)) .withProducerName(testStorage.getProducerName())