diff --git a/CHANGELOG.md b/CHANGELOG.md index 03a08517660..63180c805ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Remove support for Apache Kafka 3.5.0 and 3.5.1 * The `UseKRaft` feature gate moves to beta stage and is enabled by default. If needed, `UseKRaft` can be disabled in the feature gates configuration in the Cluster Operator. +* Add support for moving from dedicated controller-only KRaft nodes to mixed KRaft nodes * Fix NullPointerException from missing listenerConfig when using custom auth * Added support for Kafka Exporter `offset.show-all` parameter * Prevent removal of the `broker` process role from KRaft mixed-nodes that have assigned partition-replicas diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java index 3cf13acaa73..6ddbecf5cfe 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java @@ -7,6 +7,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.fabric8.kubernetes.api.model.ContainerStateWaiting; import io.fabric8.kubernetes.api.model.ContainerStatus; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.client.KubernetesClientException; @@ -24,6 +25,7 @@ import io.strimzi.operator.common.ReconciliationLogger; import io.strimzi.operator.common.Util; import io.strimzi.operator.common.VertxUtil; +import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.OrderedProperties; import io.strimzi.operator.common.operator.resource.PodOperator; import io.vertx.core.Future; @@ -458,8 +460,13 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext) restartContext.restartReasons = podNeedsRestart.apply(pod); + // We try to detect the current roles. If we fail to do so, we optimistically assume the roles did not + // change and the desired roles still apply. + boolean isBroker = isAlreadyBroker(pod).orElse(nodeRef.broker()); + boolean isController = isAlreadyController(pod).orElse(nodeRef.controller()); + try { - checkIfRestartOrReconfigureRequired(nodeRef, restartContext); + checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext); if (restartContext.forceRestart) { LOGGER.debugCr(reconciliation, "Pod {} can be rolled now", nodeRef); restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext); @@ -467,7 +474,7 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext) if (deferController(nodeRef, restartContext)) { LOGGER.debugCr(reconciliation, "Pod {} is the active controller and there are other pods to verify first.", nodeRef); throw new ForceableProblem("Pod " + nodeRef.podName() + " is the active controller and there are other pods to verify first"); - } else if (!canRoll(nodeRef, 60, TimeUnit.SECONDS, false, restartContext)) { + } else if (!canRoll(nodeRef.nodeId(), isController, isBroker, 60, TimeUnit.SECONDS, false, restartContext)) { LOGGER.debugCr(reconciliation, "Pod {} cannot be updated right now", nodeRef); throw new UnforceableProblem("Pod " + nodeRef.podName() + " cannot be updated right now."); } else { @@ -491,7 +498,7 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext) } catch (ForceableProblem e) { if (restartContext.podStuck || restartContext.backOff.done() || e.forceNow) { - if (canRoll(nodeRef, 60_000, TimeUnit.MILLISECONDS, true, restartContext)) { + if (canRoll(nodeRef.nodeId(), isController, isBroker, 60_000, TimeUnit.MILLISECONDS, true, restartContext)) { String errorMsg = e.getMessage(); if (e.getCause() != null) { @@ -589,7 +596,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) { * Determine whether the pod should be restarted, or the broker reconfigured. */ @SuppressWarnings("checkstyle:CyclomaticComplexity") - private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem { + private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem { RestartReasons reasonToRestartPod = restartContext.restartReasons; if (restartContext.podStuck && !reasonToRestartPod.contains(RestartReason.POD_HAS_OLD_REVISION)) { // If the pod is unschedulable then deleting it, or trying to open an Admin client to it will make no difference @@ -609,8 +616,8 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext KafkaBrokerConfigurationDiff brokerConfigDiff = null; KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null; boolean needsReconfig = false; - if (nodeRef.controller()) { + if (isController) { if (maybeInitControllerAdminClient()) { String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT; String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId()); @@ -624,7 +631,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext } else { //TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/8593 is complete // we should change this logic to immediately restart this pod because we cannot connect to it. - if (nodeRef.broker()) { + if (isBroker) { // If it is a combined node (controller and broker) and the admin client cannot be initialised, // restart this pod. There is no reason to continue as we won't be able to // connect an admin client to this pod for other checks later. @@ -641,8 +648,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext } } - if (nodeRef.broker()) { - + if (isBroker) { if (!maybeInitBrokerAdminClient()) { LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, because it does not seem to responding to connection attempts", nodeRef); reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); @@ -700,7 +706,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext * @param nodeRef The reference of the broker. * @return a Future which completes with the config of the given broker. */ - protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException { + /* test */ Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException { ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId())); return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), 30, TimeUnit.SECONDS, @@ -713,7 +719,7 @@ protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, Interrup * @param brokerId The id of the broker. * @return a Future which completes with the logging of the given broker. */ - protected Config brokerLogging(int brokerId) throws ForceableProblem, InterruptedException { + /* test */ Config brokerLogging(int brokerId) throws ForceableProblem, InterruptedException { ConfigResource resource = Util.getBrokersLogging(brokerId); return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)), 30, TimeUnit.SECONDS, @@ -721,7 +727,7 @@ protected Config brokerLogging(int brokerId) throws ForceableProblem, Interrupte ); } - protected void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff) + /* test */ void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff) throws ForceableProblem, InterruptedException { Map> updatedConfig = new HashMap<>(2); var podId = nodeRef.nodeId(); @@ -804,20 +810,20 @@ public FatalProblem(String message) { } } - private boolean canRoll(NodeRef nodeRef, long timeout, TimeUnit unit, boolean ignoreSslError, RestartContext restartContext) + private boolean canRoll(int nodeId, boolean isController, boolean isBroker, long timeout, TimeUnit unit, boolean ignoreSslError, RestartContext restartContext) throws ForceableProblem, InterruptedException, UnforceableProblem { try { - if (nodeRef.broker() && nodeRef.controller()) { - boolean canRollController = await(restartContext.quorumCheck.canRollController(nodeRef.nodeId()), timeout, unit, + if (isBroker && isController) { + boolean canRollController = await(restartContext.quorumCheck.canRollController(nodeId), timeout, unit, t -> new UnforceableProblem("An error while trying to determine the possibility of updating Kafka controller pods", t)); - boolean canRollBroker = await(availability(brokerAdminClient).canRoll(nodeRef.nodeId()), timeout, unit, + boolean canRollBroker = await(availability(brokerAdminClient).canRoll(nodeId), timeout, unit, t -> new ForceableProblem("An error while trying to determine the possibility of updating Kafka broker pods", t)); return canRollController && canRollBroker; - } else if (nodeRef.controller()) { - return await(restartContext.quorumCheck.canRollController(nodeRef.nodeId()), timeout, unit, + } else if (isController) { + return await(restartContext.quorumCheck.canRollController(nodeId), timeout, unit, t -> new UnforceableProblem("An error while trying to determine the possibility of updating Kafka controller pods", t)); } else { - return await(availability(brokerAdminClient).canRoll(nodeRef.nodeId()), timeout, unit, + return await(availability(brokerAdminClient).canRoll(nodeId), timeout, unit, t -> new ForceableProblem("An error while trying to determine the possibility of updating Kafka broker pods", t)); } } catch (ForceableProblem | UnforceableProblem e) { @@ -909,7 +915,7 @@ protected Future restart(Pod pod, RestartContext restartContext) { * Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an * empty set, use the brokers service to bootstrap the client. */ - protected Admin adminClient(Set nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem { + /* test */ Admin adminClient(Set nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem { // If no nodes are passed initialize the admin client using the brokers service // TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/8593 is completed review whether // this function can be reverted to expect nodes to be non empty @@ -935,11 +941,11 @@ protected Admin adminClient(Set nodes, boolean ceShouldBeFatal) throws } } - protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) { + /* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) { return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs); } - protected KafkaAvailability availability(Admin ac) { + /* test */ KafkaAvailability availability(Admin ac) { return new KafkaAvailability(reconciliation, ac); } @@ -1047,5 +1053,49 @@ protected Future isReady(String namespace, String podName) { return Future.failedFuture(error); }); } + + /** + * Checks from the Pod labels if the Kafka node is already a broker or not. + * + * @param pod Current Pod + * + * @return Optional with true if the pod is already a broker, false if it is not broker or empty optional + * if the label is not present. + */ + /* test */ static Optional isAlreadyBroker(Pod pod) { + return checkBooleanLabel(pod, Labels.STRIMZI_BROKER_ROLE_LABEL); + } + + /** + * Checks from the Pod labels if the Kafka node is already a controller or not. + * + * @param pod Current Pod + * + * @return Optional with true if the pod is already a controller, false if it is not controller or empty optional + * if the label is not present. + */ + /* test */ static Optional isAlreadyController(Pod pod) { + return checkBooleanLabel(pod, Labels.STRIMZI_CONTROLLER_ROLE_LABEL); + } + + /** + * Generic method to extract a boolean value from Kubernetes resource labels + * + * @param pod Kube resource with metadata + * @param annotation Name of the label for which we want to extract the boolean value + * + * @return Optional with true if the label is present and is set to `true`, false if it is present and not set to + * `true` or empty optional if the label is not present. + */ + private static Optional checkBooleanLabel(HasMetadata pod, String annotation) { + if (pod != null + && pod.getMetadata() != null + && pod.getMetadata().getLabels() != null + && pod.getMetadata().getLabels().containsKey(annotation)) { + return Optional.of("true".equalsIgnoreCase(pod.getMetadata().getLabels().get(annotation))); + } else { + return Optional.empty(); + } + } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java index 755561ac2ea..785ee957e80 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java @@ -18,6 +18,7 @@ import io.strimzi.operator.common.BackOff; import io.strimzi.operator.common.DefaultAdminClientProvider; import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.operator.resource.PodOperator; import io.vertx.core.Future; import io.vertx.core.Vertx; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -704,6 +706,33 @@ podOps, noException(), null, noException(), noException(), noException(), asList(7, 4, 3, 5, 6, 8, 1, 0, 2)); //Rolls in order: unready controllers, ready controllers, unready brokers, ready brokers } + @Test + public void testExistingRoles() { + // No pod + assertThat(KafkaRoller.isAlreadyBroker(null), is(Optional.empty())); + assertThat(KafkaRoller.isAlreadyController(null), is(Optional.empty())); + + // No annotation + Pod pod = new PodBuilder().withNewMetadata().withName("my-pod").endMetadata().build(); + assertThat(KafkaRoller.isAlreadyBroker(pod), is(Optional.empty())); + assertThat(KafkaRoller.isAlreadyController(pod), is(Optional.empty())); + + // Annotation set to false + pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "grr", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "meh")).endMetadata().build(); + assertThat(KafkaRoller.isAlreadyBroker(pod).orElseThrow(), is(false)); + assertThat(KafkaRoller.isAlreadyController(pod).orElseThrow(), is(false)); + + // Annotation set to wrong value + pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "false", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "false")).endMetadata().build(); + assertThat(KafkaRoller.isAlreadyBroker(pod).orElseThrow(), is(false)); + assertThat(KafkaRoller.isAlreadyController(pod).orElseThrow(), is(false)); + + // Annotation set to true + pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "true", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "true")).endMetadata().build(); + assertThat(KafkaRoller.isAlreadyBroker(pod).orElseThrow(), is(true)); + assertThat(KafkaRoller.isAlreadyController(pod).orElseThrow(), is(true)); + } + private TestingKafkaRoller rollerWithControllers(PodOperator podOps, int... controllers) { return new TestingKafkaRoller(null, null, addPodNames(KafkaRollerTest.REPLICAS), podOps, noException(), null, noException(), noException(), noException(),