Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add current roles to the node pool status #9458

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "conditions", "observedGeneration", "nodeIds", "clusterId", "replicas", "labelSelector" })
@JsonPropertyOrder({ "conditions", "observedGeneration", "nodeIds", "clusterId", "roles", "replicas", "labelSelector" })
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaNodePoolStatus extends Status {
private static final long serialVersionUID = 1L;

private List<Integer> nodeIds;
private String clusterId;
private List<ProcessRoles> roles;

// Replicas and label selector are required for scale subresource
private int replicas;
Expand All @@ -54,6 +55,15 @@ public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

@Description("The roles currently assigned to this pool.")
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
public List<ProcessRoles> getRoles() {
return roles;
}

public void setRoles(List<ProcessRoles> roles) {
this.roles = roles;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Description("The current number of pods being used to provide this resource.")
public int getReplicas() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public KafkaNodePoolStatus generateNodePoolStatus(String clusterId) {
return new KafkaNodePoolStatusBuilder()
.withClusterId(clusterId)
.withNodeIds(new ArrayList<>(idAssignment.desired()))
.withRoles(processRoles.stream().sorted().toList())
.withReplicas(idAssignment.desired().size())
.withLabelSelector(getSelectorLabels().toSelectorString())
.withConditions(warningConditions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static KafkaNodePool convertKafkaToVirtualNodePool(Kafka kafka, Integer e
.endSpec()
.withNewStatus()
.withNodeIds(nodeIds)
.withRoles(ProcessRoles.BROKER)
.endStatus()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ public void testNodesAndStatuses() {
assertThat(statuses.get("controllers").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=controllers"));
assertThat(statuses.get("controllers").getNodeIds().size(), is(3));
assertThat(statuses.get("controllers").getNodeIds(), hasItems(0, 1, 2));
assertThat(statuses.get("controllers").getRoles().size(), is(1));
assertThat(statuses.get("controllers").getRoles(), hasItems(ProcessRoles.CONTROLLER));
assertThat(statuses.get("brokers").getReplicas(), is(3));
assertThat(statuses.get("brokers").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=brokers"));
assertThat(statuses.get("brokers").getNodeIds().size(), is(3));
assertThat(statuses.get("brokers").getNodeIds(), hasItems(1000, 1001, 1002));
assertThat(statuses.get("brokers").getRoles().size(), is(1));
assertThat(statuses.get("brokers").getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,14 @@ public void testNodesAndStatuses() {
assertThat(statuses.get("pool-a").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=pool-a"));
assertThat(statuses.get("pool-a").getNodeIds().size(), is(3));
assertThat(statuses.get("pool-a").getNodeIds(), hasItems(0, 1, 2));
assertThat(statuses.get("pool-a").getRoles().size(), is(1));
assertThat(statuses.get("pool-a").getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(statuses.get("pool-b").getReplicas(), is(2));
assertThat(statuses.get("pool-b").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=pool-b"));
assertThat(statuses.get("pool-b").getNodeIds().size(), is(2));
assertThat(statuses.get("pool-b").getNodeIds(), hasItems(10, 11));
assertThat(statuses.get("pool-b").getRoles().size(), is(1));
assertThat(statuses.get("pool-b").getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public void testKafkaPool() {
assertThat(status.getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=pool"));
assertThat(status.getNodeIds().size(), is(3));
assertThat(status.getNodeIds(), hasItems(10, 11, 13));
assertThat(status.getRoles().size(), is(1));
assertThat(status.getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public void testConvertMinimalKafka() {

// Status
assertThat(pool.getStatus().getNodeIds(), is(nullValue()));
assertThat(pool.getStatus().getRoles().size(), is(1));
assertThat(pool.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand All @@ -206,6 +208,8 @@ public void testConvertKafkaWithExistingReplicas() {
// Status
assertThat(pool.getStatus().getNodeIds().size(), is(3));
assertThat(pool.getStatus().getNodeIds(), hasItems(0, 1, 2));
assertThat(pool.getStatus().getRoles().size(), is(1));
assertThat(pool.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down Expand Up @@ -267,5 +271,7 @@ public void testConvertMaximalKafka() {
// Status
assertThat(pool.getStatus().getNodeIds().size(), is(3));
assertThat(pool.getStatus().getNodeIds(), hasItems(0, 1, 2));
assertThat(pool.getStatus().getRoles().size(), is(1));
assertThat(pool.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,14 @@ public void testReconcileKafkaScaleDown(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(2));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

async.flag();
})));
Expand Down Expand Up @@ -543,10 +547,14 @@ public void testReconcileKafkaScaleUp(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(4));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1, 2, 5)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

async.flag();
})));
Expand Down Expand Up @@ -601,14 +609,20 @@ public void testReconcileAddPool(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(3));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolC = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-c").get();
assertThat(poolC.getStatus().getReplicas(), is(2));
assertThat(poolC.getStatus().getNodeIds(), is(List.of(5, 6)));
assertThat(poolC.getStatus().getRoles().size(), is(1));
assertThat(poolC.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

async.flag();
})));
Expand Down Expand Up @@ -658,14 +672,20 @@ public void testReconcileAndRemovePool(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(3));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolC = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-c").get();
assertThat(poolC.getStatus().getReplicas(), is(2));
assertThat(poolC.getStatus().getNodeIds(), is(List.of(5, 6)));
assertThat(poolC.getStatus().getRoles().size(), is(1));
assertThat(poolC.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

// Remove pool-b
Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down Expand Up @@ -359,9 +360,13 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -778,9 +783,13 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -955,9 +964,13 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -1119,11 +1132,17 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().size(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getNodeIds(), is(List.of(5, 6)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b", "pool-c")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -1295,8 +1314,12 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().size(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down
2 changes: 2 additions & 0 deletions documentation/modules/appendix_crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3499,6 +3499,8 @@ Used in: xref:type-KafkaNodePool-{context}[`KafkaNodePool`]
|integer array
|clusterId 1.2+<.<a|Kafka cluster ID.
|string
|roles 1.2+<.<a|The roles currently assigned to this pool.
|string (one or more of [controller, broker]) array
|replicas 1.2+<.<a|The current number of pods being used to provide this resource.
|integer
|labelSelector 1.2+<.<a|Label selector for pods providing this resource.
Expand Down
Loading
Loading