From 448e7b6b42f41010ed3b7ae3cecfdeb2d8f84ddf Mon Sep 17 00:00:00 2001 From: Jakub Scholz Date: Thu, 7 Mar 2024 17:48:46 +0100 Subject: [PATCH 1/2] Fix wrong escaping of OAuth configuration in Mirror Maker 2 - Closes #9786 Signed-off-by: Jakub Scholz --- .../model/KafkaMirrorMaker2Connectors.java | 4 +- .../KafkaMirrorMaker2ConnectorsTest.java | 7 +- .../systemtest/security/oauth/OauthTlsST.java | 192 ++++++++++++++++++ 3 files changed, 200 insertions(+), 3 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java index 90b54414270..fa3d12ad25a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java @@ -282,8 +282,8 @@ private static String oauthJaasConfig(KafkaMirrorMaker2ClusterSpec cluster, Kafk } if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) { - jaasOptions.put("oauth.ssl.truststore.location", "/tmp/kafka/clusters/\"" + cluster.getAlias() + "\"-oauth.truststore.p12"); - jaasOptions.put("oauth.ssl.truststore.password", "\"${file:" + CONNECTORS_CONFIG_FILE + ":oauth.ssl.truststore.password}\""); + jaasOptions.put("oauth.ssl.truststore.location", "/tmp/kafka/clusters/" + cluster.getAlias() + "-oauth.truststore.p12"); + jaasOptions.put("oauth.ssl.truststore.password", "${file:" + CONNECTORS_CONFIG_FILE + ":oauth.ssl.truststore.password}"); jaasOptions.put("oauth.ssl.truststore.type", "PKCS12"); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java index f74c032fab9..1ae255bb06b 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java @@ -5,6 +5,7 @@ package io.strimzi.operator.cluster.model; import io.strimzi.api.kafka.model.common.CertAndKeySecretSourceBuilder; +import io.strimzi.api.kafka.model.common.CertSecretSourceBuilder; import io.strimzi.api.kafka.model.common.ConnectorState; import io.strimzi.api.kafka.model.connector.KafkaConnector; import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2; @@ -615,6 +616,7 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() { .withKey("refreshTokenKey") .withSecretName("refreshTokenSecretName") .endRefreshToken() + .withTlsTrustedCertificates(new CertSecretSourceBuilder().withCertificate("ca.crt").withSecretName("my-oauth-secret").build()) .endKafkaClientAuthenticationOAuth() .build(); @@ -627,7 +629,10 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() { is(Map.of("oauth.client.secret", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.client.secret}", "oauth.access.token", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.access.token}", "oauth.refresh.token", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.refresh.token}", - "oauth.password.grant.password", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.password.grant.password}"))); + "oauth.password.grant.password", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.password.grant.password}", + "oauth.ssl.truststore.location", "/tmp/kafka/clusters/sourceClusterAlias-oauth.truststore.p12", + "oauth.ssl.truststore.type", "PKCS12", + "oauth.ssl.truststore.password", "${file:/tmp/strimzi-mirrormaker2-connector.properties:oauth.ssl.truststore.password}"))); assertThat(config, is(Map.of("prefix.alias", "sourceClusterAlias", diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java index 36134296904..dc7fd8312c8 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java @@ -13,6 +13,9 @@ import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; import io.strimzi.api.kafka.model.mirrormaker.KafkaMirrorMakerResources; +import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2; +import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2ClusterSpec; +import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2ClusterSpecBuilder; import io.strimzi.operator.common.model.Labels; import io.strimzi.systemtest.Environment; import io.strimzi.systemtest.TestConstants; @@ -31,6 +34,7 @@ import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates; import io.strimzi.systemtest.templates.crd.KafkaConnectTemplates; +import io.strimzi.systemtest.templates.crd.KafkaMirrorMaker2Templates; import io.strimzi.systemtest.templates.crd.KafkaMirrorMakerTemplates; import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates; import io.strimzi.systemtest.templates.crd.KafkaTemplates; @@ -41,14 +45,21 @@ import io.strimzi.systemtest.utils.kafkaUtils.KafkaConnectUtils; import io.strimzi.systemtest.utils.kafkaUtils.KafkaConnectorUtils; import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils; +import io.strimzi.systemtest.utils.kubeUtils.controllers.JobUtils; +import io.strimzi.test.TestUtils; +import io.strimzi.test.WaitException; +import io.strimzi.test.k8s.KubeClusterResource; import io.vertx.core.cli.annotations.Description; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; +import java.time.Duration; + import static io.strimzi.systemtest.TestConstants.ACCEPTANCE; import static io.strimzi.systemtest.TestConstants.ARM64_UNSUPPORTED; import static io.strimzi.systemtest.TestConstants.BRIDGE; @@ -412,6 +423,187 @@ void testMirrorMaker() { ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); } + @Description("As a OAuth MirrorMaker 2, I am able to replicate Topic data using using encrypted communication") + @IsolatedTest("Using more tha one Kafka cluster in one Namespace") + @Tag(MIRROR_MAKER) + @Tag(NODEPORT_SUPPORTED) + @SuppressWarnings({"checkstyle:MethodLength"}) + void testMirrorMaker2() { + // Nodeport needs cluster wide rights to work properly which is not possible with STRIMZI_RBAC_SCOPE=NAMESPACE + assumeFalse(Environment.isNamespaceRbacScope()); + final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); + + String producerName = OAUTH_PRODUCER_NAME + "-" + testStorage.getClusterName(); + String consumerName = OAUTH_CONSUMER_NAME + "-" + testStorage.getClusterName(); + + resourceManager.createResourceWithWait(KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build()); + + KafkaOauthClients oauthExampleClients = new KafkaOauthClientsBuilder() + .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) + .withProducerName(producerName) + .withConsumerName(consumerName) + .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName)) + .withTopicName(testStorage.getTopicName()) + .withMessageCount(testStorage.getMessageCount()) + .withOauthClientId(OAUTH_CLIENT_NAME) + .withOauthClientSecret(OAUTH_CLIENT_SECRET) + .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) + .build(); + + resourceManager.createResourceWithWait(oauthExampleClients.producerStrimziOauthTls(oauthClusterName)); + ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); + + resourceManager.createResourceWithWait(oauthExampleClients.consumerStrimziOauthTls(oauthClusterName)); + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); + + String targetKafkaCluster = oauthClusterName + "-target"; + String kafkaSourceClusterName = oauthClusterName; + + resourceManager.createResourceWithWait( + NodePoolsConverter.convertNodePoolsIfNeeded( + KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getTargetBrokerPoolName(), targetKafkaCluster, 1).build(), + KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getTargetControllerPoolName(), targetKafkaCluster, 1).build() + ) + ); + resourceManager.createResourceWithWait(KafkaTemplates.kafkaEphemeral(targetKafkaCluster, 1, 1) + .editMetadata() + .withNamespace(Environment.TEST_SUITE_NAMESPACE) + .endMetadata() + .editSpec() + .editKafka() + .withListeners(OauthAbstractST.BUILD_OAUTH_TLS_LISTENER.apply(keycloakInstance), + new GenericKafkaListenerBuilder() + .withName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME) + .withPort(9094) + .withType(KafkaListenerType.NODEPORT) + .withTls(true) + .withNewKafkaListenerAuthenticationOAuth() + .withValidIssuerUri(keycloakInstance.getValidIssuerUri()) + .withJwksExpirySeconds(keycloakInstance.getJwksExpireSeconds()) + .withJwksRefreshSeconds(keycloakInstance.getJwksRefreshSeconds()) + .withJwksEndpointUri(keycloakInstance.getJwksEndpointUri()) + .withUserNameClaim(keycloakInstance.getUserNameClaim()) + .withTlsTrustedCertificates( + new CertSecretSourceBuilder() + .withSecretName(KeycloakInstance.KEYCLOAK_SECRET_NAME) + .withCertificate(KeycloakInstance.KEYCLOAK_SECRET_CERT) + .build()) + .withDisableTlsHostnameVerification(true) + .endKafkaListenerAuthenticationOAuth() + .build()) + .endKafka() + .endSpec() + .build()); + + // Deploy MirrorMaker2 with OAuth + KafkaMirrorMaker2ClusterSpec sourceClusterWithOauth = new KafkaMirrorMaker2ClusterSpecBuilder() + .withAlias(kafkaSourceClusterName) + .withConfig(connectorConfig) + .withBootstrapServers(KafkaResources.tlsBootstrapAddress(oauthClusterName)) + // this is for kafka tls connection + .withNewTls() + .withTrustedCertificates(new CertSecretSourceBuilder() + .withCertificate("ca.crt") + .withSecretName(KafkaResources.clusterCaCertificateSecretName(oauthClusterName)) + .build()) + .endTls() + .withNewKafkaClientAuthenticationOAuth() + .withTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) + .withClientId("kafka-mirror-maker-2") + .withNewClientSecret() + .withSecretName(MIRROR_MAKER_2_OAUTH_SECRET) + .withKey(OAUTH_KEY) + .endClientSecret() + .withConnectTimeoutSeconds(CONNECT_TIMEOUT_S) + .withReadTimeoutSeconds(READ_TIMEOUT_S) + // this is for authorization server tls connection + .withTlsTrustedCertificates(new CertSecretSourceBuilder() + .withSecretName(KeycloakInstance.KEYCLOAK_SECRET_NAME) + .withCertificate(KeycloakInstance.KEYCLOAK_SECRET_CERT) + .build()) + .withDisableTlsHostnameVerification(true) + .endKafkaClientAuthenticationOAuth() + .build(); + + KafkaMirrorMaker2ClusterSpec targetClusterWithOauth = new KafkaMirrorMaker2ClusterSpecBuilder() + .withAlias(testStorage.getTargetClusterName()) + .withConfig(connectorConfig) + .withBootstrapServers(KafkaResources.tlsBootstrapAddress(targetKafkaCluster)) + // this is for kafka tls connection + .withNewTls() + .withTrustedCertificates(new CertSecretSourceBuilder() + .withCertificate("ca.crt") + .withSecretName(KafkaResources.clusterCaCertificateSecretName(targetKafkaCluster)) + .build()) + .endTls() + .withNewKafkaClientAuthenticationOAuth() + .withTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) + .withClientId("kafka-mirror-maker-2") + .withNewClientSecret() + .withSecretName(MIRROR_MAKER_2_OAUTH_SECRET) + .withKey(OAUTH_KEY) + .endClientSecret() + .withConnectTimeoutSeconds(CONNECT_TIMEOUT_S) + .withReadTimeoutSeconds(READ_TIMEOUT_S) + // this is for authorization server tls connection + .withTlsTrustedCertificates(new CertSecretSourceBuilder() + .withSecretName(KeycloakInstance.KEYCLOAK_SECRET_NAME) + .withCertificate(KeycloakInstance.KEYCLOAK_SECRET_CERT) + .build()) + .withDisableTlsHostnameVerification(true) + .endKafkaClientAuthenticationOAuth() + .build(); + + String kafkaTargetClusterTopicName = kafkaSourceClusterName + "." + testStorage.getTopicName(); + + resourceManager.createResourceWithWait(KafkaMirrorMaker2Templates.kafkaMirrorMaker2(oauthClusterName, testStorage.getTargetClusterName(), kafkaSourceClusterName, 1, false) + .editMetadata() + .withNamespace(Environment.TEST_SUITE_NAMESPACE) + .endMetadata() + .editSpec() + .withClusters(sourceClusterWithOauth, targetClusterWithOauth) + .editFirstMirror() + .withSourceCluster(kafkaSourceClusterName) + .endMirror() + .endSpec() + .build()); + + final String kafkaMirrorMaker2PodName = kubeClient().listPods(Environment.TEST_SUITE_NAMESPACE, oauthClusterName, Labels.STRIMZI_KIND_LABEL, KafkaMirrorMaker2.RESOURCE_KIND).get(0).getMetadata().getName(); + final String kafkaMirrorMaker2Logs = KubeClusterResource.cmdKubeClient(Environment.TEST_SUITE_NAMESPACE).execInCurrentNamespace(Level.DEBUG, "logs", kafkaMirrorMaker2PodName).out(); + verifyOauthConfiguration(kafkaMirrorMaker2Logs); + + TestUtils.waitFor("MirrorMaker2 to copy messages from " + kafkaSourceClusterName + " to " + testStorage.getTargetClusterName(), + Duration.ofSeconds(30).toMillis(), TestConstants.TIMEOUT_FOR_MIRROR_MAKER_COPY_MESSAGES_BETWEEN_BROKERS, + () -> { + LOGGER.info("Deleting Job: {}/{}", Environment.TEST_SUITE_NAMESPACE, consumerName); + JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, consumerName); + + LOGGER.info("Creating new client with new consumer-group and also to point on {} cluster", testStorage.getTargetClusterName()); + + KafkaOauthClients kafkaOauthClientJob = new KafkaOauthClientsBuilder() + .withNamespaceName(Environment.TEST_SUITE_NAMESPACE) + .withProducerName(producerName) + .withConsumerName(consumerName) + .withBootstrapAddress(KafkaResources.tlsBootstrapAddress(targetKafkaCluster)) + .withTopicName(kafkaTargetClusterTopicName) + .withMessageCount(testStorage.getMessageCount()) + .withOauthClientId(OAUTH_CLIENT_NAME) + .withOauthClientSecret(OAUTH_CLIENT_SECRET) + .withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri()) + .build(); + + resourceManager.createResourceWithWait(kafkaOauthClientJob.consumerStrimziOauthTls(targetKafkaCluster)); + + try { + ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount()); + return true; + } catch (WaitException e) { + LOGGER.error("Failed while waiting for consumer to succeed", e); + return false; + } + }); + } + @ParallelTest void testIntrospectionEndpoint() { final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); From a96310b0aa58ccd1e8ed6cdac2acd6b5cab0239c Mon Sep 17 00:00:00 2001 From: Jakub Scholz Date: Thu, 7 Mar 2024 18:01:18 +0100 Subject: [PATCH 2/2] Fix tag Signed-off-by: Jakub Scholz --- .../java/io/strimzi/systemtest/security/oauth/OauthTlsST.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java index dc7fd8312c8..ec053735331 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java @@ -67,6 +67,7 @@ import static io.strimzi.systemtest.TestConstants.CONNECT_COMPONENTS; import static io.strimzi.systemtest.TestConstants.HTTP_BRIDGE_DEFAULT_PORT; import static io.strimzi.systemtest.TestConstants.MIRROR_MAKER; +import static io.strimzi.systemtest.TestConstants.MIRROR_MAKER2; import static io.strimzi.systemtest.TestConstants.NODEPORT_SUPPORTED; import static io.strimzi.systemtest.TestConstants.OAUTH; import static io.strimzi.systemtest.TestConstants.REGRESSION; @@ -425,7 +426,7 @@ void testMirrorMaker() { @Description("As a OAuth MirrorMaker 2, I am able to replicate Topic data using using encrypted communication") @IsolatedTest("Using more tha one Kafka cluster in one Namespace") - @Tag(MIRROR_MAKER) + @Tag(MIRROR_MAKER2) @Tag(NODEPORT_SUPPORTED) @SuppressWarnings({"checkstyle:MethodLength"}) void testMirrorMaker2() {