Skip to content

Commit

Permalink
Ability to move data between JBOD disks using Cruise Control (#10644)
Browse files Browse the repository at this point in the history
Signed-off-by: ShubhamRwt <[email protected]>
  • Loading branch information
ShubhamRwt authored Nov 1, 2024
1 parent 1961072 commit a55eabf
Show file tree
Hide file tree
Showing 23 changed files with 1,206 additions and 312 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 0.45.0

* Add support for Kafka 3.8.1
* Ability to move data between JBOD disks using Cruise Control.

## 0.44.0

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.api.kafka.model.rebalance;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.MinimumItems;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Configures the broker and Volume IDs for the remove-disks endpoint for Cruise Control
*/
@Buildable(
editableEnabled = false,
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonPropertyOrder({"brokerId", "volumeIds"})
@JsonInclude(JsonInclude.Include.NON_NULL)
@EqualsAndHashCode
@ToString
public class BrokerAndVolumeIds implements UnknownPropertyPreserving {

private Integer brokerId;
private List<Integer> volumeIds;
private Map<String, Object> additionalProperties;

@Description("ID of the broker that contains the disk from which you want to move the partition replicas.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getBrokerId() {
return brokerId;
}

public void setBrokerId(Integer brokerId) {
this.brokerId = brokerId;
}

@Description("IDs of the disks from which the partition replicas need to be moved.")
@JsonInclude(JsonInclude.Include.NON_NULL)
@MinimumItems(1)
public List<Integer> getVolumeIds() {
return volumeIds;
}

public void setVolumeIds(List<Integer> volumeIds) {
this.volumeIds = volumeIds;
}

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties != null ? this.additionalProperties : Map.of();
}

@Override
public void setAdditionalProperty(String name, Object value) {
if (this.additionalProperties == null) {
this.additionalProperties = new HashMap<>(2);
}
this.additionalProperties.put(name, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
public enum KafkaRebalanceMode {
FULL("full"),
ADD_BROKERS("add-brokers"),
REMOVE_BROKERS("remove-brokers");
REMOVE_BROKERS("remove-brokers"),
REMOVE_DISKS("remove-disks");

private final String name;

Expand All @@ -31,6 +32,8 @@ public static KafkaRebalanceMode forValue(String value) {
return ADD_BROKERS;
case "remove-brokers":
return REMOVE_BROKERS;
case "remove-disks":
return REMOVE_DISKS;
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.strimzi.api.kafka.model.common.Spec;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.Minimum;
import io.strimzi.crdgenerator.annotations.MinimumItems;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand All @@ -23,7 +24,7 @@
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "mode", "brokers", "goals", "skipHardGoalCheck", "rebalanceDisk", "excludedTopics", "concurrentPartitionMovementsPerBroker",
"concurrentIntraBrokerPartitionMovements", "concurrentLeaderMovements", "replicationThrottle", "replicaMovementStrategies" })
"concurrentIntraBrokerPartitionMovements", "concurrentLeaderMovements", "replicationThrottle", "replicaMovementStrategies", "moveReplicasOffVolumes" })
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaRebalanceSpec extends Spec {
Expand All @@ -45,13 +46,15 @@ public class KafkaRebalanceSpec extends Spec {
private int concurrentLeaderMovements;
private long replicationThrottle;
private List<String> replicaMovementStrategies;
private List<BrokerAndVolumeIds> moveReplicasOffVolumes;

@Description("Mode to run the rebalancing. " +
"The supported modes are `full`, `add-brokers`, `remove-brokers`.\n" +
"If not specified, the `full` mode is used by default. \n\n" +
"* `full` mode runs the rebalancing across all the brokers in the cluster.\n" +
"* `add-brokers` mode can be used after scaling up the cluster to move some replicas to the newly added brokers.\n" +
"* `remove-brokers` mode can be used before scaling down the cluster to move replicas out of the brokers to be removed.\n")
"* `remove-brokers` mode can be used before scaling down the cluster to move replicas out of the brokers to be removed.\n" +
"* `remove-disks` mode can be used to move data across the volumes within the same broker\n")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public KafkaRebalanceMode getMode() {
return mode;
Expand Down Expand Up @@ -166,4 +169,14 @@ public List<String> getReplicaMovementStrategies() {
public void setReplicaMovementStrategies(List<String> replicaMovementStrategies) {
this.replicaMovementStrategies = replicaMovementStrategies;
}

@Description("List of brokers and their corresponding volumes from which replicas need to be moved.")
@MinimumItems(1)
public List<BrokerAndVolumeIds> getMoveReplicasOffVolumes() {
return moveReplicasOffVolumes;
}

public void setMoveReplicasOffVolumes(List<BrokerAndVolumeIds> moveReplicasOffVolumes) {
this.moveReplicasOffVolumes = moveReplicasOffVolumes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,29 @@ void testKafkaRebalanceRemoveBroker() {
createDeleteCustomResource("KafkaRebalance-remove-brokers.yaml");
}

@Test
void testKafkaRebalanceRemoveDisks() {
createDeleteCustomResource("KafkaRebalance-remove-disks.yaml");
}

@Test
void testKafkaRebalanceRemoveDisksWithEmptyVolumes() {
Throwable exception = assertThrows(
KubernetesClientException.class,
() -> createDeleteCustomResource("KafkaRebalance-remove-disks-with-empty-volumes.yaml"));

assertThat(exception.getMessage(), containsString("spec.moveReplicasOffVolumes[0].volumeIds: Invalid value: 0: spec.moveReplicasOffVolumes[0].volumeIds in body should have at least 1 items."));
}

@Test
void testKafkaRebalanceRemoveDisksWithEmptyBrokerAndVolumes() {
Throwable exception = assertThrows(
KubernetesClientException.class,
() -> createDeleteCustomResource("KafkaRebalance-remove-disks-with-empty-broker-and-volumes.yaml"));

assertThat(exception.getMessage(), containsString("spec.moveReplicasOffVolumes: Invalid value: 0: spec.moveReplicasOffVolumes in body should have at least 1 items."));
}

@Test
void testKafkaRebalanceWrongMode() {
Throwable exception = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: remove-disks
moveReplicasOffVolumes: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: remove-disks
moveReplicasOffVolumes:
- brokerId: 0
volumeIds: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: remove-disks
moveReplicasOffVolumes:
- brokerId: 0
volumeIds: [1]
Loading

0 comments on commit a55eabf

Please sign in to comment.