Skip to content

Commit

Permalink
Check assigned partition-replicas when changing node roles (#9608)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Jan 31, 2024
1 parent cabe2a5 commit 9d7b1a6
Show file tree
Hide file tree
Showing 19 changed files with 842 additions and 472 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
If needed, `UseKRaft` can be disabled in the feature gates configuration in the Cluster Operator.
* Fix NullPointerException from missing listenerConfig when using custom auth
* Added support for Kafka Exporter `offset.show-all` parameter
* Prevent removal of the `broker` process role from KRaft mixed-nodes that have assigned partition-replicas
* Improve broker scale-down prevention to continue in reconciliation when scale-down cannot be executed

### Changes, deprecations and removals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ public Set<NodeRef> nodes() {
}

/**
* Generates list of references to Kafka node ids going to be removed from the Kafka cluster.
* Generates list of Kafka node IDs that are going to be removed from the Kafka cluster.
*
* @return Set of Kafka node ids which are going to be removed
* @return Set of Kafka node IDs which are going to be removed
*/
public Set<Integer> removedNodes() {
Set<Integer> nodes = new LinkedHashSet<>();
Expand All @@ -417,6 +417,21 @@ public Set<Integer> removedNodes() {
return nodes;
}

/**
* Generates list of Kafka node IDs that used to have the broker role but do not have it anymore.
*
* @return Set of Kafka node IDs which are removing the broker role
*/
public Set<Integer> usedToBeBrokerNodes() {
Set<Integer> nodes = new LinkedHashSet<>();

for (KafkaPool pool : nodePools) {
nodes.addAll(pool.usedToBeBrokerNodes());
}

return nodes;
}

/**
* Generates list of references to Kafka nodes for this Kafka cluster which have the broker role. The references
* contain both the pod name and the ID of the Kafka node. This includes only the broker nodes. Controller nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,20 @@ public KafkaNodePoolStatus generateNodePoolStatus(String clusterId) {
}

/**
* Generates list of references to Kafka nodes going to be removed from the Kafka cluster.
* Generates set of Kafka node IDs going to be removed from the Kafka cluster.
*
* @return Set of Kafka node ids which are going to be removed
* @return Set of Kafka node IDs which are going to be removed
*/
public Set<Integer> scaledDownNodes() {
return idAssignment.toBeRemoved();
}

/**
* Generates set of Kafka node IDs that used to have the broker role but do not have it anymore.
*
* @return Set of Kafka node IDs which are removing the broker role
*/
public Set<Integer> usedToBeBrokerNodes() {
return idAssignment.usedToBeBroker();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
/**
* Record for holding the assignment of node IDs for a single node pool
*
* @param current Current node IDs
* @param desired Desired node IDs
* @param toBeRemoved Node IDs which should be removed
* @param toBeAdded Node IDs which should be added
* @param current Current node IDs
* @param desired Desired node IDs
* @param toBeRemoved Node IDs which should be removed
* @param toBeAdded Node IDs which should be added
* @param usedToBeBroker Node IDs that used to have the broker role but should not have it anymore
*/
public record NodeIdAssignment(Set<Integer> current, Set<Integer> desired, Set<Integer> toBeRemoved, Set<Integer> toBeAdded) {
public record NodeIdAssignment(Set<Integer> current, Set<Integer> desired, Set<Integer> toBeRemoved, Set<Integer> toBeAdded, Set<Integer> usedToBeBroker) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.strimzi.operator.cluster.model.nodepools;

import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.InvalidConfigurationException;
import io.strimzi.operator.common.Reconciliation;
Expand All @@ -15,6 +16,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;

/**
Expand Down Expand Up @@ -60,6 +62,7 @@ public NodeIdAssignor(Reconciliation reconciliation, List<KafkaNodePool> nodePoo
for (KafkaNodePool pool : nodePools) {
TreeSet<Integer> current;
TreeSet<Integer> desired;
TreeSet<Integer> usedToBeBroker;
TreeSet<Integer> toBeRemoved = new TreeSet<>();
TreeSet<Integer> toBeAdded = new TreeSet<>();

Expand Down Expand Up @@ -93,9 +96,12 @@ public NodeIdAssignor(Reconciliation reconciliation, List<KafkaNodePool> nodePoo
// We have no change
desired = current;
}

usedToBeBroker = brokerNodesBecomingControllerOnlyNodes(pool, current, desired);
} else {
// New pool? It is all scale-up
current = new TreeSet<>();
usedToBeBroker = new TreeSet<>();
desired = new TreeSet<>();

// Provides the node IDs which the user would like to assign to the next broker(s)
Expand All @@ -108,7 +114,7 @@ public NodeIdAssignor(Reconciliation reconciliation, List<KafkaNodePool> nodePoo
}
}

assignments.put(pool.getMetadata().getName(), new NodeIdAssignment(current, desired, toBeRemoved, toBeAdded));
assignments.put(pool.getMetadata().getName(), new NodeIdAssignment(current, desired, toBeRemoved, toBeAdded, usedToBeBroker));
}
}

Expand Down Expand Up @@ -303,5 +309,30 @@ private NodeIdRange removeIdRange(Reconciliation reconciliation, KafkaNodePool p
return null;
}
}

/**
* Finds any Kafka nodes that used to have the broker role but should not have it anymore. These nodes require a
* special treatment - for example they should have a special check for not having any partition replicas assigned.
*
* @param pool Node Pool to check for the roles change
* @param current Current node IDs belonging to this node pool
* @param desired Desired node IDs belonging to this node pool
*
* @return Set of node IDs that used to have the broker role but will not have it anymore
*/
/* test */ static TreeSet<Integer> brokerNodesBecomingControllerOnlyNodes(KafkaNodePool pool, Set<Integer> current, Set<Integer> desired) {
if (pool.getStatus() != null
&& pool.getSpec().getRoles() != null
&& pool.getStatus().getRoles() != null
&& pool.getStatus().getRoles().contains(ProcessRoles.BROKER) // Used to have the broker role
&& !pool.getSpec().getRoles().contains(ProcessRoles.BROKER)) { // But should not have it anymore
// Collect all node IDs that are both current and desired (i.e. we do not care about old nodes being scaled down or new nodes being scaled up)
TreeSet<Integer> usedToBeBroker = new TreeSet<>(desired);
usedToBeBroker.retainAll(current);
return usedToBeBroker;
} else {
return new TreeSet<>();
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.common.AdminClientProvider;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.VertxUtil;
import io.strimzi.operator.common.operator.resource.SecretOperator;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartitionInfo;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Class which contains several utility function which check if broker scale down or role change can be done or not.
*/
public class BrokersInUseCheck {
/**
* Logger
*/
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(BrokersInUseCheck.class.getName());

/**
* Checks if broker contains any partition replicas when scaling down
*
* @param reconciliation Reconciliation marker
* @param vertx Vert.x instance
* @param secretOperator Secret operator for working with Secrets
* @param adminClientProvider Used to create the Admin client instance
*
* @return returns future set of node ids containing partition replicas based on the outcome of the check
*/
public Future<Set<Integer>> brokersInUse(Reconciliation reconciliation, Vertx vertx, SecretOperator secretOperator, AdminClientProvider adminClientProvider) {
return ReconcilerUtils.clientSecrets(reconciliation, secretOperator)
.compose(adminClientSecrets -> {
try {
String bootstrapHostname = KafkaResources.bootstrapServiceName(reconciliation.name()) + "." + reconciliation.namespace() + ".svc:" + KafkaCluster.REPLICATION_PORT;
LOGGER.debugCr(reconciliation, "Creating AdminClient for Kafka cluster in namespace {}", reconciliation.namespace());
Admin kafkaAdmin = adminClientProvider.createAdminClient(bootstrapHostname, adminClientSecrets.resultAt(0), adminClientSecrets.resultAt(1), "cluster-operator");

return topicNames(reconciliation, vertx, kafkaAdmin)
.compose(names -> describeTopics(reconciliation, vertx, kafkaAdmin, names))
.compose(topicDescriptions -> {
Set<Integer> brokersWithPartitionReplicas = new HashSet<>();

for (TopicDescription td : topicDescriptions.values()) {
for (TopicPartitionInfo pd : td.partitions()) {
for (org.apache.kafka.common.Node broker : pd.replicas()) {
brokersWithPartitionReplicas.add(broker.id());
}
}
}

kafkaAdmin.close();
return Future.succeededFuture(brokersWithPartitionReplicas);
}).recover(error -> {
LOGGER.warnCr(reconciliation, "Failed to get list of brokers in use", error);
kafkaAdmin.close();
return Future.failedFuture(error);
});
} catch (KafkaException e) {
LOGGER.warnCr(reconciliation, "Failed to check if broker contains any partition replicas", e);
return Future.failedFuture(e);
}
});
}

/**
* This method gets the topic names after interacting with the Admin client
*
* @param reconciliation Reconciliation marker
* @param vertx Vert.x instance
* @param kafkaAdmin Instance of Kafka Admin
* @return a Future with set of topic names
*/
/* test */ Future<Set<String>> topicNames(Reconciliation reconciliation, Vertx vertx, Admin kafkaAdmin) {
return VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, kafkaAdmin.listTopics(new ListTopicsOptions().listInternal(true)).names());
}

/**
* Returns a collection of topic descriptions
*
* @param reconciliation Reconciliation marker
* @param vertx Vert.x instance
* @param kafkaAdmin Instance of Admin client
* @param names Set of topic names
* @return a Future with map containing the topic name and description
*/
/* test */ Future<Map<String, TopicDescription>> describeTopics(Reconciliation reconciliation, Vertx vertx, Admin kafkaAdmin, Set<String> names) {
return VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, kafkaAdmin.describeTopics(names).allTopicNames());
}
}
Loading

0 comments on commit 9d7b1a6

Please sign in to comment.