diff --git a/systemtest/src/main/java/io/strimzi/systemtest/security/OpenSsl.java b/systemtest/src/main/java/io/strimzi/systemtest/security/OpenSsl.java new file mode 100644 index 00000000000..aa4d0474e02 --- /dev/null +++ b/systemtest/src/main/java/io/strimzi/systemtest/security/OpenSsl.java @@ -0,0 +1,197 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.systemtest.security; + +import io.strimzi.systemtest.Constants; +import io.strimzi.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * The `OpenSsl` class encapsulates OpenSSL command execution using the OpenSSLCommand object, + * which interfaces with the command-line version of OpenSSL. It serves as a versatile tool + * for various OpenSSL operations, primarily focusing on the creation of private keys, the + * generation of certificate signing requests (CSRs), and the signing of these CSRs using + * a certificate authority (CA). The primary use case for this class is to facilitate the + * simulation of externally provided client certificates, offering a seamless solution for + * integrating secure authentication mechanisms into your application. + */ +public class OpenSsl { + private static final Logger LOGGER = LogManager.getLogger(OpenSsl.class); + + private static class OpenSslCommand { + ProcessBuilder pb = new ProcessBuilder(); + + public OpenSslCommand(String command) { + this("openssl", command); + } + + public OpenSslCommand(String binary, String command) { + pb.command().add(binary); + pb.command().add(command); + } + + public OpenSslCommand withOption(String option) { + pb.command().add(option); + return this; + } + + public OpenSslCommand withOptionAndArgument(String option, File argument) { + pb.command().add(option); + pb.command().add(argument.getAbsolutePath()); + return this; + } + + public OpenSslCommand withOptionAndArgument(String option, String argument) { + pb.command().add(option); + pb.command().add(argument); + return this; + } + + public void execute() { + executeAndReturnOnSuccess(true); + } + + public String executeAndReturn() { + return executeAndReturnOnSuccess(true); + } + + public String executeAndReturnOnSuccess(boolean failOnNonZeroOutput) { + + Path commandOutput = null; + try { + commandOutput = Files.createTempFile("openssl-command-output-", ".txt"); + + pb.redirectErrorStream(true) + .redirectOutput(commandOutput.toFile()); + + LOGGER.debug("Running command: {}", pb.command()); + + Process process = pb.start(); + + OutputStream outputStream = process.getOutputStream(); + outputStream.close(); + + int exitCode = process.waitFor(); + String outputText = Files.readString(commandOutput, StandardCharsets.UTF_8); + + if (exitCode != 0 && failOnNonZeroOutput) { + throw new RuntimeException("Openssl command failed. " + outputText); + } + + return outputText; + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } finally { + removeFile(commandOutput); + } + } + + static void removeFile(Path fileToRemove) { + if (fileToRemove != null && Files.exists(fileToRemove)) { + try { + Files.delete(fileToRemove); + } catch (IOException e) { + LOGGER.debug("File could not be removed: {}", fileToRemove); + } + } + + } + } + + public static File generatePrivateKey() { + return generatePrivateKey(2048); + } + + public static File generatePrivateKey(int keyLengthBits) { + try { + LOGGER.info("Creating client RSA private key with size of {} bits", keyLengthBits); + File privateKey = Files.createTempFile("private-key-", ".pem").toFile(); + + new OpenSslCommand("genpkey") + .withOptionAndArgument("-algorithm", "RSA") + .withOptionAndArgument("-pkeyopt", "rsa_keygen_bits:" + keyLengthBits) + .withOptionAndArgument("-out", privateKey) + .execute(); + + return privateKey; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static File generateCertSigningRequest(File privateKey, String subject) { + try { + LOGGER.info("Creating Certificate Signing Request file"); + File csr = Files.createTempFile("csr-", ".pem").toFile(); + + new OpenSslCommand("req") + .withOption("-new") + .withOptionAndArgument("-key", privateKey) + .withOptionAndArgument("-out", csr) + .withOptionAndArgument("-subj", subject) + .execute(); + + return csr; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static File generateSignedCert(File csr, File caCrt, File caKey) { + try { + LOGGER.info("Creating signed certificate file"); + File cert = Files.createTempFile("signed-cert-", ".pem").toFile(); + + new OpenSslCommand("x509") + .withOption("-req") + .withOptionAndArgument("-in", csr) + .withOptionAndArgument("-CA", caCrt) + .withOptionAndArgument("-CAkey", caKey) + .withOptionAndArgument("-out", cert) + .withOption("-CAcreateserial") + .execute(); + + waitForCertIsInValidDateRange(cert); + + return cert; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void waitForCertIsInValidDateRange(File certificate) { + String dates = new OpenSslCommand("x509") + .withOption("-noout") + .withOption("-dates") + .withOptionAndArgument("-in", certificate) + .executeAndReturn() + .trim().replace("\s\s", "\s"); + + String startDate = dates.split("\n")[0].replace("notBefore=", ""); + String endDate = dates.split("\n")[1].replace("notAfter=", ""); + + ZoneId gmtZone = ZoneId.of("GMT"); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MMM dd HH:mm:ss yyyy z"); + ZonedDateTime notBefore = ZonedDateTime.of(LocalDateTime.parse(startDate, formatter), gmtZone); + ZonedDateTime notAfter = ZonedDateTime.of(LocalDateTime.parse(endDate, formatter), gmtZone); + + TestUtils.waitFor("certificate to be in valid date range", Constants.POLL_INTERVAL_FOR_RESOURCE_READINESS, Constants.CO_OPERATION_TIMEOUT_SHORT, + () -> { + ZonedDateTime now = ZonedDateTime.now(gmtZone); + return (now.isAfter(notBefore.plusSeconds(4)) && now.isBefore(notAfter.minusSeconds(3))); + }); + } +} diff --git a/systemtest/src/main/java/io/strimzi/systemtest/security/SystemTestCertManager.java b/systemtest/src/main/java/io/strimzi/systemtest/security/SystemTestCertManager.java index d2377322b03..0c35c3ee476 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/security/SystemTestCertManager.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/security/SystemTestCertManager.java @@ -4,6 +4,7 @@ */ package io.strimzi.systemtest.security; +import java.nio.charset.StandardCharsets; import org.bouncycastle.asn1.ASN1Encodable; import org.bouncycastle.asn1.ASN1Encoding; import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; @@ -169,6 +170,35 @@ private static File exportCertsToPemFile(SystemTestCertAndKey... certs) throws I return certFile; } + /** + * This method exports Certificate Authority (CA) data to a temporary file for cases in which mentioned data is + * necessary in form of file - for use in applications like OpenSSL. The primary purpose is to save CA files, + * such as certificates and private keys (e.g., ca.key and ca.cert), into temporary files. + * These files are essential when you need to provide CA data to other applications, such as OpenSSL, + * for signing user Certificate Signing Requests (CSRs). + * + * @param caData The Certificate Authority data to be saved to the temporary file. + * @param prefix The prefix for the temporary file's name. + * @param suffix The suffix for the temporary file's name. + * @return A File object representing the temporary file containing the CA data. + * @throws RuntimeException If an IOException occurs while creating a file or writing into the temporary file + * given the critical role these operations play in ensuring proper functionality. + */ + public static File exportCaDataToFile(String caData, String prefix, String suffix) { + try { + File tempFile = Files.createTempFile(prefix + "-", suffix).toFile(); + + try (FileWriter fileWriter = new FileWriter(tempFile, StandardCharsets.UTF_8)) { + fileWriter.write(caData); + fileWriter.flush(); + } + + return tempFile; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public static boolean containsAllDN(String principal1, String principal2) { try { return new LdapName(principal1).getRdns().containsAll(new LdapName(principal2).getRdns()); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/user/UserST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/user/UserST.java index 1a5479f1c6b..1693c97f430 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/user/UserST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/user/UserST.java @@ -6,17 +6,17 @@ import io.fabric8.kubernetes.api.model.DeletionPropagation; import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.api.model.SecretBuilder; import io.strimzi.api.kafka.Crds; import io.strimzi.api.kafka.model.AclOperation; -import io.strimzi.api.kafka.model.AclRule; -import io.strimzi.api.kafka.model.AclRuleBuilder; +import io.strimzi.api.kafka.model.AclResourcePatternType; +import io.strimzi.api.kafka.model.listener.KafkaListenerAuthenticationTls; import io.strimzi.api.kafka.model.KafkaResources; import io.strimzi.api.kafka.model.KafkaUser; import io.strimzi.api.kafka.model.KafkaUserScramSha512ClientAuthentication; import io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListenerBuilder; import io.strimzi.api.kafka.model.listener.arraylistener.KafkaListenerType; import io.strimzi.api.kafka.model.status.Condition; -import io.strimzi.operator.common.model.Labels; import io.strimzi.systemtest.AbstractST; import io.strimzi.systemtest.Constants; import io.strimzi.systemtest.Environment; @@ -26,17 +26,24 @@ import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClientsBuilder; import io.strimzi.systemtest.resources.crd.KafkaUserResource; +import io.strimzi.systemtest.security.OpenSsl; +import io.strimzi.systemtest.security.SystemTestCertManager; import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.templates.crd.KafkaTemplates; import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates; import io.strimzi.systemtest.templates.crd.KafkaUserTemplates; import io.strimzi.systemtest.templates.specific.ScraperTemplates; import io.strimzi.systemtest.utils.ClientUtils; -import io.strimzi.systemtest.utils.StUtils; import io.strimzi.systemtest.utils.TestKafkaVersion; import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils; import io.strimzi.systemtest.utils.kubeUtils.objects.SecretUtils; import io.strimzi.test.TestUtils; +import io.strimzi.test.k8s.KubeClusterResource; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Base64; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.BeforeAll; @@ -47,13 +54,10 @@ import static io.strimzi.systemtest.Constants.REGRESSION; import static io.strimzi.systemtest.enums.CustomResourceStatus.NotReady; import static io.strimzi.systemtest.enums.CustomResourceStatus.Ready; -import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient; import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -340,123 +344,100 @@ void testCreatingUsersWithSecretPrefix(ExtensionContext extensionContext) { } @ParallelNamespaceTest - void testTlsExternalUser(ExtensionContext extensionContext) { + void testTlsExternalUser(ExtensionContext extensionContext) throws IOException, InterruptedException { final TestStorage testStorage = storageMap.get(extensionContext); - final String namespaceName = StUtils.getNamespaceBasedOnRbac(Environment.TEST_SUITE_NAMESPACE, extensionContext); - final String clusterName = testStorage.getClusterName(); - final String userName = testStorage.getKafkaUsername(); - final String topicName = testStorage.getTopicName(); - - final AclRule aclRule = new AclRuleBuilder() - .withNewAclRuleTopicResource() - .withName(topicName) - .endAclRuleTopicResource() - .withOperations(AclOperation.WRITE, AclOperation.DESCRIBE) - .build(); + String consumerGroupName = ClientUtils.generateRandomConsumerGroup(); - // exercise (a) - create Kafka cluster with support for authorization - resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(clusterName, 1, 1) + resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(testStorage.getClusterName(), 1, 1) .editSpec() .editKafka() .withNewKafkaAuthorizationSimple() .endKafkaAuthorizationSimple() + .withListeners(new GenericKafkaListenerBuilder() + .withName(Constants.TLS_LISTENER_DEFAULT_NAME) + .withPort(9093) + .withType(KafkaListenerType.INTERNAL) + .withTls(true) + .withAuth(new KafkaListenerAuthenticationTls()) + .build()) .endKafka() .endSpec() .build()); - // quotas configuration - final int prodRate = 1212; - final int consRate = 2121; - final int requestPerc = 21; - final double mutRate = 5d; + resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), testStorage.getNamespaceName()).build()); - final KafkaUser tlsExternalUserWithQuotasAndAcls = KafkaUserTemplates.tlsExternalUser(namespaceName, clusterName, userName) + final KafkaUser tlsExternalUserWithQuotasAndAcls = KafkaUserTemplates.tlsExternalUser(testStorage.getNamespaceName(), testStorage.getClusterName(), testStorage.getKafkaUsername()) .editSpec() .withNewKafkaUserAuthorizationSimple() - .addToAcls(aclRule) + .addNewAcl() + .withNewAclRuleTopicResource() + .withPatternType(AclResourcePatternType.LITERAL) + .withName(testStorage.getTopicName()) + .endAclRuleTopicResource() + .withOperations(AclOperation.READ, AclOperation.WRITE, AclOperation.DESCRIBE, AclOperation.CREATE) + .endAcl() + .addNewAcl() + .withNewAclRuleGroupResource() + .withPatternType(AclResourcePatternType.LITERAL) + .withName(consumerGroupName) + .endAclRuleGroupResource() + .withOperations(AclOperation.READ) + .endAcl() .endKafkaUserAuthorizationSimple() - .withNewQuotas() - .withConsumerByteRate(consRate) - .withProducerByteRate(prodRate) - .withRequestPercentage(requestPerc) - .withControllerMutationRate(mutRate) - .endQuotas() .endSpec() .build(); - - // exercise (b) - create the KafkaUser with tls external client authentication resourceManager.createResourceWithWait(extensionContext, tlsExternalUserWithQuotasAndAcls); - // verify (a) - that secrets are not generated and KafkaUser is created - KafkaUserUtils.waitForKafkaUserReady(namespaceName, userName); - assertThat(kubeClient().getSecret(namespaceName, userName), nullValue()); - - // verify (b) - if the operator has the right username in the status, that is what it also used in the ACLs and Quotas - KafkaUser user = KafkaUserResource.kafkaUserClient().inNamespace(namespaceName).withName(userName).get(); - - assertThat(user.getStatus().getUsername(), is("CN=" + userName)); - } - - /** - * @description This test case checks that KafkaUser custom resource is managed correctly, even in presence of more than just one User Operator. - * - * @steps - * 1. - In addition to already existing Kafka Cluster (A), deploy another Kafka Cluster (B) in the same Namespace - * - Kafka (B) is deployed - * 2. - Create KafkaUser custom resource with metadata due to which it will be listened to by User Operator belonging to the Kafka Cluster B - * - Secret with label referencing Kafka Cluster B is created, and Kafka resource user corresponding to created KafkaUser custom resource exists only in Kafka Cluster B - * - * @usecase - * - labels - * - user-operator - * - users - * - secrets - */ - @ParallelTest - void testUOListeningOnlyUsersInSameCluster(ExtensionContext extensionContext) { - // skip test if KRaft mode is enabled and Kafka version is lower than 3.5.0 - https://github.com/strimzi/strimzi-kafka-operator/issues/8806 - assumeFalse(Environment.isKRaftModeEnabled() && TestKafkaVersion.compareDottedVersions("3.5.0", Environment.ST_KAFKA_VERSION) == 1); - - final TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE); - final String userListeningClusterName = "user-listening-cluster"; - final String userIgnoringClusterName = userClusterName; // pre-created shared Kafka will be used as the secondary cluster (its UO ignoring KafkaUser with diff label) - - LOGGER.info("Creating new Kafka: {}/{}, whose TO will listen to soon to be created KafkaUser CR", Environment.TEST_SUITE_NAMESPACE, userListeningClusterName); - resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaEphemeral(userListeningClusterName, 3, 1) - .editMetadata() - .withNamespace(Environment.TEST_SUITE_NAMESPACE) + // For clients of authentication type tls-external, operator should not create a secret + KafkaUserUtils.waitForKafkaUserReady(testStorage.getNamespaceName(), testStorage.getKafkaUsername()); + assertThat(kubeClient().getSecret(testStorage.getNamespaceName(), testStorage.getKafkaUsername()), nullValue()); + assertThat(KafkaUserResource.kafkaUserClient().inNamespace(testStorage.getNamespaceName()).withName(testStorage.getKafkaUsername()).get().getStatus().getUsername(), is("CN=" + testStorage.getKafkaUsername())); + + // To test tls-external authentication, client needs to have an external certificates provided + // To simulate externally provided certs, these steps are done: + // 1. Generate private key and csr (containing at least common name CN) for user + // 2. Generate a certificate by signing CSR using private key from secret generated for clients by operator + // 3. Create user secret from private key, generated certificate and certificate from secret created for clients by operator + + File clientPrivateKey = OpenSsl.generatePrivateKey(); + + File csr = OpenSsl.generateCertSigningRequest(clientPrivateKey, "/CN=" + testStorage.getKafkaUsername()); + String caCrt = KubeClusterResource.kubeClient(testStorage.getNamespaceName()).getSecret(KafkaResources.clientsCaCertificateSecretName(testStorage.getClusterName())).getData().get("ca.crt"); + String caKey = KubeClusterResource.kubeClient(testStorage.getNamespaceName()).getSecret(KafkaResources.clientsCaKeySecretName(testStorage.getClusterName())).getData().get("ca.key"); + + File clientCert = OpenSsl.generateSignedCert(csr, + SystemTestCertManager.exportCaDataToFile(new String(Base64.getDecoder().decode(caCrt), StandardCharsets.UTF_8), "ca", ".crt"), + SystemTestCertManager.exportCaDataToFile(new String(Base64.getDecoder().decode(caKey), StandardCharsets.UTF_8), "ca", ".key")); + + Secret secretBuilder = new SecretBuilder() + .withApiVersion("v1") + .withKind("Secret") + .withNewMetadata() + .withName(testStorage.getKafkaUsername()) + .withNamespace(testStorage.getNamespaceName()) .endMetadata() - .build()); - - LOGGER.info("Creating new KafkaUser: {}/{}", Environment.TEST_SUITE_NAMESPACE, testStorage.getUsername()); - final KafkaUser user = KafkaUserTemplates.tlsUser(Environment.TEST_SUITE_NAMESPACE, userListeningClusterName, testStorage.getUsername()).build(); - resourceManager.createResourceWithWait(extensionContext, KafkaUserTemplates.userWithQuotas(user, 123, 123, 12, 10d) - .editMetadata() - .withNamespace(Environment.TEST_SUITE_NAMESPACE) - .endMetadata() - .build()); - - LOGGER.info("Verifying KafkaUser: {}/{} is created in Kafka: {}/{} ", Environment.TEST_SUITE_NAMESPACE, testStorage.getUsername(), Environment.TEST_SUITE_NAMESPACE, userListeningClusterName); - String entityOperatorPodName = kubeClient().listPodNamesInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, Labels.STRIMZI_NAME_LABEL, KafkaResources.entityOperatorDeploymentName(userListeningClusterName)).get(0); - String uOLogs = kubeClient().logsInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, entityOperatorPodName, "user-operator"); - assertThat(uOLogs, containsString("KafkaUser " + testStorage.getUsername() + " in namespace " + Environment.TEST_SUITE_NAMESPACE + " was ADDED")); - - LOGGER.info("Verifying KafkaUser: {}/{} is not created in Kafka: {}/{}", Environment.TEST_SUITE_NAMESPACE, testStorage.getUsername(), Environment.TEST_SUITE_NAMESPACE, userIgnoringClusterName); - entityOperatorPodName = kubeClient().listPodNamesInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, Labels.STRIMZI_NAME_LABEL, KafkaResources.entityOperatorDeploymentName(userIgnoringClusterName)).get(0); - uOLogs = kubeClient().logsInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, entityOperatorPodName, "user-operator"); - assertThat(uOLogs, not(containsString("KafkaUser " + testStorage.getUsername() + " in namespace " + Environment.TEST_SUITE_NAMESPACE + " was ADDED"))); - - LOGGER.info("Verifying KafkaUser: {}/{} has label: {}={} corresponding to Kafka cluster it belongs to", Environment.TEST_SUITE_NAMESPACE, testStorage.getUsername(), Labels.STRIMZI_CLUSTER_LABEL, userListeningClusterName); - String kafkaUserResource = cmdKubeClient(Environment.TEST_SUITE_NAMESPACE).getResourceAsYaml("kafkauser", testStorage.getUsername()); - assertThat(kafkaUserResource, containsString(Labels.STRIMZI_CLUSTER_LABEL + ": " + userListeningClusterName)); + .addToData("ca.crt", caCrt) + .addToData("user.crt", Base64.getEncoder().encodeToString(Files.readAllBytes(clientCert.toPath()))) + .addToData("user.key", Base64.getEncoder().encodeToString(Files.readAllBytes(clientPrivateKey.toPath()))) + .withType("Opaque") + .build(); - SecretUtils.waitForSpecificLabelKeyValue(testStorage.getUsername(), Environment.TEST_SUITE_NAMESPACE, Labels.STRIMZI_CLUSTER_LABEL, userListeningClusterName); + kubeClient().namespace(testStorage.getNamespaceName()).createSecret(secretBuilder); + SecretUtils.waitForSecretReady(testStorage.getNamespaceName(), testStorage.getKafkaUsername(), () -> { }); - KafkaUserUtils.waitForKafkaUserMappingIntoKafkaResource(Environment.TEST_SUITE_NAMESPACE, testStorage.getUsername(), userListeningClusterName, scraperPodName); + KafkaClients kafkaClients = new KafkaClientsBuilder() + .withProducerName(testStorage.getProducerName()) + .withConsumerName(testStorage.getConsumerName()) + .withNamespaceName(testStorage.getNamespaceName()) + .withMessageCount(testStorage.getMessageCount()) + .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName())) + .withTopicName(testStorage.getTopicName()) + .withUsername(testStorage.getKafkaUsername()) + .withConsumerGroup(consumerGroupName) + .build(); - LOGGER.info("Verifying KafkaUser: {}/{} is not present in Kafka ecosystem in Kafka/{}", Environment.TEST_SUITE_NAMESPACE, testStorage.getUsername(), userIgnoringClusterName); - String getUserResult = KafkaCmdClient.describeUserUsingPodCli(Environment.TEST_SUITE_NAMESPACE, scraperPodName, KafkaResources.plainBootstrapAddress(userIgnoringClusterName), "CN=" + testStorage.getUsername()); - assertThat("KafkaUser CR is not mapped as Kafka resource user", getUserResult, not(containsString(testStorage.getUsername()))); + resourceManager.createResourceWithWait(extensionContext, kafkaClients.producerTlsStrimzi(testStorage.getClusterName()), kafkaClients.consumerTlsStrimzi(testStorage.getClusterName())); + ClientUtils.waitForClientsSuccess(testStorage); } @BeforeAll