Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[system test] Auto-rebalance feature test case #10666

Merged
merged 10 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@
* [cruise-control](labels/cruise-control.md)


## testAutoKafkaRebalanceScaleUpScaleDown

**Description:** Test the Kafka Cruise Control auto-rebalance mechanism during scaling up and down of brokers.

**Steps:**

| Step | Action | Result |
| - | - | - |
| 1. | Create broker and controller KafkaNodePools. | Both KafkaNodePools are successfully created. |
| 2. | Create KafkaRebalance templates for scale-up and scale-down operations. | KafkaRebalance templates are created and annotated as configuration templates. |
| 3. | Deploy Kafka cluster with Cruise Control using the defined templates for auto-rebalance. | Kafka cluster with Cruise Control is deployed, configured with the specified auto-rebalance templates. |
| 4. | Ensure the Kafka auto-rebalance status is Idle. | Kafka auto-rebalance status is confirmed to be Idle. |
| 5. | Scale Kafka up to a higher number of brokers. | Kafka brokers are scaled up, and Cruise Control initiates rebalancing in ADD_BROKERS mode. |
| 6. | Verify that Kafka auto-rebalance status transitions to RebalanceOnScaleUp and then back to Idle. | Auto-rebalance status moves to RebalanceOnScaleUp during scaling and returns to Idle after rebalancing completes. |
| 7. | Check that topic replicas are moved to the new brokers. | Topic replicas are distributed onto the newly added brokers. |
| 8. | Change number of replicas of Kafka cluster to initial replicas within KafkaNodePool (i.e., 3 brokers) | KafkaNodePool is set successfully to 3 replicas and auto-rebalance is triggered. |
ppatierno marked this conversation as resolved.
Show resolved Hide resolved
| 9. | After auto-rebalance is done, Kafka cluster is scale-down to the original number of brokers. | Kafka brokers are scaled down because there are no hosted partitions anymore that would prevent such an operation. |
| 10. | Verify that Kafka auto-rebalance status transitions to RebalanceOnScaleDown and then back to Idle. | Auto-rebalance status moves to RebalanceOnScaleDown during scaling down and returns to Idle after rebalancing completes. |
| 11. | Confirm that the cluster is stable after scaling operations. | Cluster returns to a stable state with initial number of brokers and Cruise Control completed the rebalancing. |

**Labels:**

* [cruise-control](labels/cruise-control.md)


## testCruiseControlChangesFromRebalancingtoProposalReadyWhenSpecUpdated

**Description:** Test that ensures Cruise Control transitions from Rebalancing to ProposalReady state when the KafkaRebalance spec is updated.
Expand Down
1 change: 1 addition & 0 deletions development-docs/systemtests/labels/cruise-control.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Ensuring the correctness of Cruise Control behavior under different configuratio
**Tests:**
- [testAutoCreationOfCruiseControlTopicsWithResources](../io.strimzi.systemtest.cruisecontrol.CruiseControlST.md)
- [testConfigurationUpdate](../io.strimzi.systemtest.cruisecontrol.CruiseControlConfigurationST.md)
- [testAutoKafkaRebalanceScaleUpScaleDown](../io.strimzi.systemtest.cruisecontrol.CruiseControlST.md)
- [testCruiseControlWithSingleNodeKafka](../io.strimzi.systemtest.cruisecontrol.CruiseControlST.md)
- [testKafkaRebalanceAutoApprovalMechanism](../io.strimzi.systemtest.cruisecontrol.CruiseControlST.md)
- [testCruiseControlDuringBrokerScaleUpAndDown](../io.strimzi.systemtest.cruisecontrol.CruiseControlST.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,12 @@ public static boolean waitForKafkaRebalanceReadiness(KafkaRebalance resource) {

return true;
}

public static void waitForKafkaRebalanceIsPresent(final String namespaceName, final String kafkaRebalanceName) {
TestUtils.waitFor("KafkaRebalance is present", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.GLOBAL_STATUS_TIMEOUT, () -> KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(kafkaRebalanceName).get() != null);
}

public static void waitForKafkaRebalanceIsDeleted(final String namespaceName, final String kafkaRebalanceName) {
TestUtils.waitFor("KafkaRebalance is deleted", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.GLOBAL_STATUS_TIMEOUT, () -> KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(kafkaRebalanceName).get() == null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaMetadataState;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.kafka.cruisecontrol.KafkaAutoRebalanceState;
import io.strimzi.api.kafka.model.kafka.cruisecontrol.KafkaAutoRebalanceStatusBrokers;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode;
import io.strimzi.kafka.config.model.ConfigModel;
import io.strimzi.kafka.config.model.ConfigModels;
import io.strimzi.kafka.config.model.Scope;
Expand Down Expand Up @@ -645,4 +648,68 @@ public static void verifyKafkaKraftMetadataLog(final TestStorage testStorage, fi
private static String buildDirectoryPath(int volumeId, int kafkaIndex) {
return String.format("/var/lib/kafka/data-%d/kafka-log%d/__cluster_metadata-0", volumeId, kafkaIndex);
}

/**
* Waits until the Kafka cluster in the given namespace the specified AutoRebalance state is set.
*
* @param namespaceName The name of the Kubernetes namespace where the Kafka cluster resides.
* @param clusterName The name of the Kafka cluster.
* @param expectedState The expected {@link KafkaAutoRebalanceState} that is set.
*/
public static void waitUntilKafkaHasAutoRebalanceState(final String namespaceName,
final String clusterName,
final KafkaAutoRebalanceState expectedState) {
TestUtils.waitFor(
String.format("Kafka cluster %s/%s and KafkaRebalance state: '%s'", clusterName, namespaceName, expectedState),
TestConstants.GLOBAL_POLL_INTERVAL,
TestConstants.GLOBAL_TIMEOUT,
() -> {
Kafka kafka = KafkaResource.kafkaClient().inNamespace(namespaceName).withName(clusterName).get();
if (kafka == null || kafka.getStatus() == null) {
return false; // Kafka cluster or its status is not available
}

final KafkaAutoRebalanceState currentState = kafka.getStatus().getAutoRebalance() != null
? kafka.getStatus().getAutoRebalance().getState() : null;

return currentState != null && currentState.equals(expectedState);
}
);
}

/**
* Waits until the Kafka cluster in the given namespace reaches the specified AutoRebalance mode and brokers.
see-quick marked this conversation as resolved.
Show resolved Hide resolved
*
* @param namespaceName The name of the Kubernetes namespace where the Kafka cluster resides.
* @param clusterName The name of the Kafka cluster.
* @param expectedKafkaAutoRebalanceMode The expected {@link KafkaRebalanceMode} that the Kafka cluster should reach.
see-quick marked this conversation as resolved.
Show resolved Hide resolved
* @param scalingBrokers A list of expected broker IDs that the Kafka cluster should have in AutoRebalance state.
*/
public static void waitUntilKafkaHasExpectedAutoRebalanceModeAndBrokers(final String namespaceName,
final String clusterName,
final KafkaRebalanceMode expectedKafkaAutoRebalanceMode,
final List<Integer> scalingBrokers) {
TestUtils.waitFor(
String.format("Kafka cluster %s/%s and KafkaRebalance mode '%s' with brokers %s",
namespaceName, clusterName, expectedKafkaAutoRebalanceMode, scalingBrokers),
TestConstants.GLOBAL_POLL_INTERVAL,
TestConstants.GLOBAL_TIMEOUT,
() -> {
final Kafka kafka = KafkaResource.kafkaClient().inNamespace(namespaceName).withName(clusterName).get();
if (kafka == null || kafka.getStatus() == null || kafka.getStatus().getAutoRebalance() == null) {
return false; // Kafka cluster, its status, or AutoRebalance status is not available
}

final List<KafkaAutoRebalanceStatusBrokers> autoRebalanceModeBrokersList = kafka.getStatus().getAutoRebalance().getModes();
if (autoRebalanceModeBrokersList == null) {
return false;
}

return autoRebalanceModeBrokersList.stream()
.anyMatch(modeBrokers -> modeBrokers.getMode().equals(expectedKafkaAutoRebalanceMode)
&& modeBrokers.getBrokers().equals(scalingBrokers));
}
);
}

}
Loading
Loading