Skip to content

Commit

Permalink
Reflect the current roles when rolling Kafka nodes in KafkaRoller - C…
Browse files Browse the repository at this point in the history
…loses strimzi#9434

Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj committed Feb 14, 2024
1 parent 891624a commit 4755bcd
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Remove support for Apache Kafka 3.5.0 and 3.5.1
* The `UseKRaft` feature gate moves to beta stage and is enabled by default.
If needed, `UseKRaft` can be disabled in the feature gates configuration in the Cluster Operator.
* Add support for moving from dedicated controller-only KRaft nodes to mixed KRaft nodes
* Fix NullPointerException from missing listenerConfig when using custom auth
* Added support for Kafka Exporter `offset.show-all` parameter
* Prevent removal of the `broker` process role from KRaft mixed-nodes that have assigned partition-replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.client.KubernetesClientException;
Expand All @@ -24,6 +25,7 @@
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.VertxUtil;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.model.OrderedProperties;
import io.strimzi.operator.common.operator.resource.PodOperator;
import io.vertx.core.Future;
Expand Down Expand Up @@ -458,16 +460,21 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext)

restartContext.restartReasons = podNeedsRestart.apply(pod);

// We try to detect the current roles. If we fail to do so, we optimistically assume the roles did not
// change and the desired roles still apply.
boolean isBroker = isAlreadyBroker(pod).orElse(nodeRef.broker());
boolean isController = isAlreadyController(pod).orElse(nodeRef.controller());

try {
checkIfRestartOrReconfigureRequired(nodeRef, restartContext);
checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext);
if (restartContext.forceRestart) {
LOGGER.debugCr(reconciliation, "Pod {} can be rolled now", nodeRef);
restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext);
} else if (restartContext.needsRestart || restartContext.needsReconfig) {
if (deferController(nodeRef, restartContext)) {
LOGGER.debugCr(reconciliation, "Pod {} is the active controller and there are other pods to verify first.", nodeRef);
throw new ForceableProblem("Pod " + nodeRef.podName() + " is the active controller and there are other pods to verify first");
} else if (!canRoll(nodeRef, 60, TimeUnit.SECONDS, false, restartContext)) {
} else if (!canRoll(nodeRef.nodeId(), isController, isBroker, 60, TimeUnit.SECONDS, false, restartContext)) {
LOGGER.debugCr(reconciliation, "Pod {} cannot be updated right now", nodeRef);
throw new UnforceableProblem("Pod " + nodeRef.podName() + " cannot be updated right now.");
} else {
Expand All @@ -491,7 +498,7 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext)
} catch (ForceableProblem e) {
if (restartContext.podStuck || restartContext.backOff.done() || e.forceNow) {

if (canRoll(nodeRef, 60_000, TimeUnit.MILLISECONDS, true, restartContext)) {
if (canRoll(nodeRef.nodeId(), isController, isBroker, 60_000, TimeUnit.MILLISECONDS, true, restartContext)) {
String errorMsg = e.getMessage();

if (e.getCause() != null) {
Expand Down Expand Up @@ -589,7 +596,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) {
* Determine whether the pod should be restarted, or the broker reconfigured.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem {
private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem {
RestartReasons reasonToRestartPod = restartContext.restartReasons;
if (restartContext.podStuck && !reasonToRestartPod.contains(RestartReason.POD_HAS_OLD_REVISION)) {
// If the pod is unschedulable then deleting it, or trying to open an Admin client to it will make no difference
Expand All @@ -609,8 +616,8 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
KafkaBrokerConfigurationDiff brokerConfigDiff = null;
KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null;
boolean needsReconfig = false;
if (nodeRef.controller()) {

if (isController) {
if (maybeInitControllerAdminClient()) {
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());
Expand All @@ -624,7 +631,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
} else {
//TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/8593 is complete
// we should change this logic to immediately restart this pod because we cannot connect to it.
if (nodeRef.broker()) {
if (isBroker) {
// If it is a combined node (controller and broker) and the admin client cannot be initialised,
// restart this pod. There is no reason to continue as we won't be able to
// connect an admin client to this pod for other checks later.
Expand All @@ -641,8 +648,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
}
}

if (nodeRef.broker()) {

if (isBroker) {
if (!maybeInitBrokerAdminClient()) {
LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, because it does not seem to responding to connection attempts", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
Expand Down Expand Up @@ -700,7 +706,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
* @param nodeRef The reference of the broker.
* @return a Future which completes with the config of the given broker.
*/
protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException {
/* test */ Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException {
ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId()));
return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)),
30, TimeUnit.SECONDS,
Expand All @@ -713,15 +719,15 @@ protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, Interrup
* @param brokerId The id of the broker.
* @return a Future which completes with the logging of the given broker.
*/
protected Config brokerLogging(int brokerId) throws ForceableProblem, InterruptedException {
/* test */ Config brokerLogging(int brokerId) throws ForceableProblem, InterruptedException {
ConfigResource resource = Util.getBrokersLogging(brokerId);
return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)),
30, TimeUnit.SECONDS,
error -> new ForceableProblem("Error getting broker logging", error)
);
}

protected void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff)
/* test */ void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff)
throws ForceableProblem, InterruptedException {
Map<ConfigResource, Collection<AlterConfigOp>> updatedConfig = new HashMap<>(2);
var podId = nodeRef.nodeId();
Expand Down Expand Up @@ -804,20 +810,20 @@ public FatalProblem(String message) {
}
}

private boolean canRoll(NodeRef nodeRef, long timeout, TimeUnit unit, boolean ignoreSslError, RestartContext restartContext)
private boolean canRoll(int nodeId, boolean isController, boolean isBroker, long timeout, TimeUnit unit, boolean ignoreSslError, RestartContext restartContext)
throws ForceableProblem, InterruptedException, UnforceableProblem {
try {
if (nodeRef.broker() && nodeRef.controller()) {
boolean canRollController = await(restartContext.quorumCheck.canRollController(nodeRef.nodeId()), timeout, unit,
if (isBroker && isController) {
boolean canRollController = await(restartContext.quorumCheck.canRollController(nodeId), timeout, unit,
t -> new UnforceableProblem("An error while trying to determine the possibility of updating Kafka controller pods", t));
boolean canRollBroker = await(availability(brokerAdminClient).canRoll(nodeRef.nodeId()), timeout, unit,
boolean canRollBroker = await(availability(brokerAdminClient).canRoll(nodeId), timeout, unit,
t -> new ForceableProblem("An error while trying to determine the possibility of updating Kafka broker pods", t));
return canRollController && canRollBroker;
} else if (nodeRef.controller()) {
return await(restartContext.quorumCheck.canRollController(nodeRef.nodeId()), timeout, unit,
} else if (isController) {
return await(restartContext.quorumCheck.canRollController(nodeId), timeout, unit,
t -> new UnforceableProblem("An error while trying to determine the possibility of updating Kafka controller pods", t));
} else {
return await(availability(brokerAdminClient).canRoll(nodeRef.nodeId()), timeout, unit,
return await(availability(brokerAdminClient).canRoll(nodeId), timeout, unit,
t -> new ForceableProblem("An error while trying to determine the possibility of updating Kafka broker pods", t));
}
} catch (ForceableProblem | UnforceableProblem e) {
Expand Down Expand Up @@ -909,7 +915,7 @@ protected Future<Void> restart(Pod pod, RestartContext restartContext) {
* Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an
* empty set, use the brokers service to bootstrap the client.
*/
protected Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem {
/* test */ Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem {
// If no nodes are passed initialize the admin client using the brokers service
// TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/8593 is completed review whether
// this function can be reverted to expect nodes to be non empty
Expand All @@ -935,11 +941,11 @@ protected Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws
}
}

protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
/* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs);
}

protected KafkaAvailability availability(Admin ac) {
/* test */ KafkaAvailability availability(Admin ac) {
return new KafkaAvailability(reconciliation, ac);
}

Expand Down Expand Up @@ -1047,5 +1053,49 @@ protected Future<Void> isReady(String namespace, String podName) {
return Future.failedFuture(error);
});
}

/**
* Checks from the Pod labels if the Kafka node is already a broker or not.
*
* @param pod Current Pod
*
* @return Optional with true if the pod is already a broker, false if it is not broker or empty optional
* if the label is not present.
*/
/* test */ static Optional<Boolean> isAlreadyBroker(Pod pod) {
return checkBooleanLabel(pod, Labels.STRIMZI_BROKER_ROLE_LABEL);
}

/**
* Checks from the Pod labels if the Kafka node is already a controller or not.
*
* @param pod Current Pod
*
* @return Optional with true if the pod is already a controller, false if it is not controller or empty optional
* if the label is not present.
*/
/* test */ static Optional<Boolean> isAlreadyController(Pod pod) {
return checkBooleanLabel(pod, Labels.STRIMZI_CONTROLLER_ROLE_LABEL);
}

/**
* Generic method to extract a boolean value from Kubernetes resource labels
*
* @param pod Kube resource with metadata
* @param annotation Name of the label for which we want to extract the boolean value
*
* @return Optional with true if the label is present and is set to `true`, false if it is present and not set to
* `true` or empty optional if the label is not present.
*/
private static Optional<Boolean> checkBooleanLabel(HasMetadata pod, String annotation) {
if (pod != null
&& pod.getMetadata() != null
&& pod.getMetadata().getLabels() != null
&& pod.getMetadata().getLabels().containsKey(annotation)) {
return Optional.of("true".equalsIgnoreCase(pod.getMetadata().getLabels().get(annotation)));
} else {
return Optional.empty();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.strimzi.operator.common.BackOff;
import io.strimzi.operator.common.DefaultAdminClientProvider;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.operator.resource.PodOperator;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -704,6 +706,33 @@ podOps, noException(), null, noException(), noException(), noException(),
asList(7, 4, 3, 5, 6, 8, 1, 0, 2)); //Rolls in order: unready controllers, ready controllers, unready brokers, ready brokers
}

@Test
public void testExistingRoles() {
// No pod
assertThat(KafkaRoller.isAlreadyBroker(null), is(Optional.empty()));
assertThat(KafkaRoller.isAlreadyController(null), is(Optional.empty()));

// No annotation
Pod pod = new PodBuilder().withNewMetadata().withName("my-pod").endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod), is(Optional.empty()));
assertThat(KafkaRoller.isAlreadyController(pod), is(Optional.empty()));

// Annotation set to false
pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "grr", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "meh")).endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod).orElseThrow(), is(false));
assertThat(KafkaRoller.isAlreadyController(pod).orElseThrow(), is(false));

// Annotation set to wrong value
pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "false", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "false")).endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod).orElseThrow(), is(false));
assertThat(KafkaRoller.isAlreadyController(pod).orElseThrow(), is(false));

// Annotation set to true
pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "true", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "true")).endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod).orElseThrow(), is(true));
assertThat(KafkaRoller.isAlreadyController(pod).orElseThrow(), is(true));
}

private TestingKafkaRoller rollerWithControllers(PodOperator podOps, int... controllers) {
return new TestingKafkaRoller(null, null, addPodNames(KafkaRollerTest.REPLICAS), podOps,
noException(), null, noException(), noException(), noException(),
Expand Down

0 comments on commit 4755bcd

Please sign in to comment.