Skip to content

Commit

Permalink
[controller] Add instance stoppable check based on CV states
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourav Maji committed Sep 25, 2024
1 parent 328d72a commit fde0153
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ public enum Arg {
), RECOVER_CLUSTER("recover-cluster", "rc", true, "Cluster to recover from"),
BACKUP_FOLDER("backup-folder", "bf", true, "Backup folder path"),
DEBUG("debug", "d", false, "Print debugging messages for execute-data-recovery"),
BLOB_TRANSFER_ENABLED("blob-transfer-enabled", "bt", true, "Flag to indicate if the blob transfer is allowed or not");
BLOB_TRANSFER_ENABLED("blob-transfer-enabled", "bt", true, "Flag to indicate if the blob transfer is allowed or not"),
STORAGE_NODES("storage-nodes", "sns", true, "Input list of storage nodes to check if they can removed or not"),
TO_BE_STOPPED_NODES("to-be-stopped-nodes", "tbsn", true, "List of nodes assumed to be stopped");

private final String argName;
private final String first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static com.linkedin.venice.Arg.STARTING_OFFSET;
import static com.linkedin.venice.Arg.START_DATE;
import static com.linkedin.venice.Arg.STORAGE_NODE;
import static com.linkedin.venice.Arg.STORAGE_NODES;
import static com.linkedin.venice.Arg.STORAGE_NODE_READ_QUOTA_ENABLED;
import static com.linkedin.venice.Arg.STORAGE_PERSONA;
import static com.linkedin.venice.Arg.STORAGE_QUOTA;
Expand All @@ -118,6 +119,7 @@
import static com.linkedin.venice.Arg.STORE_TYPE;
import static com.linkedin.venice.Arg.STORE_VIEW_CONFIGS;
import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE;
import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES;
import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.Arg.URL;
import static com.linkedin.venice.Arg.VALUE_SCHEMA;
Expand Down Expand Up @@ -540,6 +542,11 @@ public enum Command {
"extract-venice-zk-paths",
"Extract Venice-specific paths from a ZK snapshot input text file to an output text file",
new Arg[] { INFILE, OUTFILE, CLUSTER_LIST, BASE_PATH }
),
CLUSTER_HEALTH_STATUS(
"cluster-health-status",
"Returns the set of instances which can be safely remove and instances which cannot be removed.",
new Arg[] { URL, CLUSTER, TO_BE_STOPPED_NODES, STORAGE_NODES }
);

private final String commandName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class ControllerApiConstants {
public static final String FROZEN = "frozen";
public static final String ERROR = "error";
public static final String STORAGE_NODE_ID = "storage_node_id"; /* host_port */

public static final String INSTANCES = "INSTANCES";
public static final String TO_BE_STOPPED_INSTANCES = "TO_BE_STOPPED_INSTANCES";

public static final String LOCKED_STORAGE_NODE_IDS = "locked_storage_node_ids";
public static final String INSTANCE_VIEW = "instance_view";
public static final String KEY_SCHEMA = "key_schema";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HEARTBEAT_TIMESTAMP;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCLUDE_SYSTEM_STORES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_VERSION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INSTANCES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_SYSTEM_STORE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_WRITE_COMPUTE_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_LOG_COMPACTION_ENABLED;
Expand Down Expand Up @@ -65,6 +66,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_TYPE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGETED_REGIONS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TOPIC;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TO_BE_STOPPED_INSTANCES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPSTREAM_OFFSET;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VALUE_SCHEMA;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION;
Expand Down Expand Up @@ -913,6 +915,17 @@ public MultiNodeResponse listStorageNodes() {
return request(ControllerRoute.LIST_NODES, newParams(), MultiNodeResponse.class);
}

public StoppableNodeStatusResponse getStoppableInstanceStatus(
String clusterName,
String instances,
String toBeStoppedInstances) {
QueryParams params = newParams().add(CLUSTER, clusterName)
.add(INSTANCES, instances)
.add(TO_BE_STOPPED_INSTANCES, toBeStoppedInstances);

return request(ControllerRoute.CLUSTER_HEALTH_STATUS, params, StoppableNodeStatusResponse.class);
}

public MultiNodesStatusResponse listInstancesStatuses(boolean enableReplicas) {
QueryParams params = newParams();
if (enableReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_OVERHEAD_BYPASS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCLUDE_SYSTEM_STORES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INSTANCES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INSTANCE_VIEW;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_SYSTEM_STORE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_LOG_COMPACTION_ENABLED;
Expand Down Expand Up @@ -84,6 +85,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_TYPE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TOPIC;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TOPIC_COMPACTION_POLICY;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TO_BE_STOPPED_INSTANCES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPSTREAM_OFFSET;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VALUE_SCHEMA;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION;
Expand Down Expand Up @@ -134,6 +136,9 @@ public enum ControllerRoute {
ROLLBACK_TO_BACKUP_VERSION(
"/rollback_to_backup_version", HttpMethod.POST, Collections.singletonList(NAME), REGIONS_FILTER
),
CLUSTER_HEALTH_STATUS(
"/cluster_health_status", HttpMethod.GET, Arrays.asList(CLUSTER, INSTANCES, TO_BE_STOPPED_INSTANCES)
),
ROLL_FORWARD_TO_FUTURE_VERSION(
"/roll_forward_to_future_version", HttpMethod.POST, Collections.singletonList(NAME), REGIONS_FILTER
),
Expand Down Expand Up @@ -164,6 +169,7 @@ public enum ControllerRoute {
ALLOW_LIST_REMOVE_NODE("/allow_list_remove_node", HttpMethod.POST, Collections.singletonList(STORAGE_NODE_ID)),

REMOVE_NODE("/remove_node", HttpMethod.POST, Collections.singletonList(STORAGE_NODE_ID)),

SKIP_ADMIN("/skip_admin_message", HttpMethod.POST, Collections.singletonList(OFFSET)),

GET_KEY_SCHEMA("/get_key_schema", HttpMethod.GET, Collections.singletonList(NAME)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.linkedin.venice.controllerapi;

import java.util.List;
import java.util.Map;


public class StoppableNodeStatusResponse extends ControllerResponse {
private List<String> stoppableInstances;
private Map<String, String> nonStoppableInstancesStatusMap;

public Map<String, String> getNonStoppableInstancesStatusMap() {
return nonStoppableInstancesStatusMap;
}

public void setNonStoppableInstancesStatusMap(Map<String, String> nonStoppableInstancesStatusMap) {
this.nonStoppableInstancesStatusMap = nonStoppableInstancesStatusMap;
}

public List<String> getStoppableInstances() {
return stoppableInstances;
}

public void setStoppableInstances(List<String> remoableInstances) {
this.stoppableInstances = remoableInstances;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.controller;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoppableNodeStatusResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.ServiceFactory;
Expand Down Expand Up @@ -87,6 +88,9 @@ public void testIsInstanceRemovableDuringPush() {
client.isNodeRemovable(Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort2)).isRemovable());
Assert.assertTrue(
client.isNodeRemovable(Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort3)).isRemovable());
String a = Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort1);
String b = Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort2);
StoppableNodeStatusResponse statuses = client.getStoppableInstanceStatus(clusterName, a + "," + b, "");

/*
* This is the same scenario as we would do later in the following test steps.
Expand Down Expand Up @@ -182,6 +186,10 @@ public void testIsInstanceRemovableAfterPush() {
try (ControllerClient client = new ControllerClient(clusterName, urls)) {
Assert.assertTrue(
client.isNodeRemovable(Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort1)).isRemovable());
String a = Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort1);
String b = Utils.getHelixNodeIdentifier(Utils.getHostName(), serverPort2);
StoppableNodeStatusResponse statuses = client.getStoppableInstanceStatus(clusterName, a + "," + b, "");

Assert.assertFalse(
client
.isNodeRemovable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,11 @@ String getNativeReplicationSourceFabric(

TopicManager getTopicManager(String pubSubServerAddress);

InstanceRemovableStatuses getInstanceRemovableStatuses(
String cluster,
Set<String> instances,
List<String> toBeStoppedInstances);

/**
* Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be
* either a parent controller or a child controller since a cluster must have a leader child controller and a leader
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.venice.controller;

import java.util.List;
import java.util.Map;


public class InstanceRemovableStatuses {
private List<String> stoppableInstances;
private Map<String, String> nonStoppableInstancesStatusMap;

private String redirectUrl;

public void setRedirectUrl(String redirectUrl) {
this.redirectUrl = redirectUrl;
}

public String getRedirectUrl() {
return redirectUrl;
}

public Map<String, String> getNonStoppableInstancesStatusMap() {
return nonStoppableInstancesStatusMap;
}

public void setNonStoppableInstancesStatusMap(Map<String, String> nonStoppableInstancesStatusMap) {
this.nonStoppableInstancesStatusMap = nonStoppableInstancesStatusMap;
}

public List<String> getStoppableInstances() {
return stoppableInstances;
}

public void setStoppableInstances(List<String> remoableInstances) {
this.stoppableInstances = remoableInstances;
}

public enum NonStoppableReason {
WILL_LOSE_DATA, MIN_ACTIVE_REPLICA_VIOLATION, ONGOING_MAINTENANCE, UNKNOWN_INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6465,6 +6465,92 @@ public boolean isLeaderControllerFor(String clusterName) {
return model.isLeader();
}

@Override
public InstanceRemovableStatuses getInstanceRemovableStatuses(
String cluster,
Set<String> instances,
List<String> toBeStoppedInstances) {
InstanceRemovableStatuses statuses = new InstanceRemovableStatuses();

// If current controller is not the leader, redirect with leader URL
if (!isLeaderControllerFor(cluster)) {
Instance instance = getLeaderController(cluster);
statuses.setRedirectUrl(instance.getNodeId());
return statuses;
}

Map<String, String> nonStoppableInstances = new HashMap<>();
List<String> stoppableInstances = new ArrayList<>();
statuses.setNonStoppableInstancesStatusMap(nonStoppableInstances);
statuses.setStoppableInstances(stoppableInstances);

// Check for maintenance, for ongoing maintenance none can be removed.
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(cluster);
MaintenanceSignal maintenanceSignal = HelixUtils.getClusterMaintenanceSignal(cluster, resources.getHelixManager());

if (maintenanceSignal != null) {
for (String instanceId: instances) {
nonStoppableInstances.put(instanceId, InstanceRemovableStatuses.NonStoppableReason.ONGOING_MAINTENANCE.name());
}
return statuses;
}

List<String> allInstances = helixAdminClient.getInstancesInCluster(cluster);
HelixCustomizedViewOfflinePushRepository customizedViewRepo = resources.getCustomizedViewRepository();
ReadWriteStoreRepository storeRepo = getHelixVeniceClusterResources(cluster).getStoreMetadataRepository();

for (String instanceId: instances) {
// If not part of the cluster, mark non-stoppable as it could be part of other cluster.
if (!allInstances.contains(instanceId)) {
nonStoppableInstances.put(instanceId, InstanceRemovableStatuses.NonStoppableReason.UNKNOWN_INSTANCE.name());
continue;
} else if (!HelixUtils
.isLiveInstance(cluster, instanceId, getHelixVeniceClusterResources(cluster).getHelixManager())) {
stoppableInstances.add(instanceId);
continue;
}
List<Replica> localReplicas = Utils.getReplicasForInstance(customizedViewRepo, instanceId);
Instance instance = Instance.fromNodeId(instanceId);
ResourceAssignment resourceAssn = customizedViewRepo.getResourceAssignment();
for (Replica replica: localReplicas) {
// Skip if replica is not current version.
if (!Utils.isCurrentVersion(replica.getResource(), storeRepo)) {
continue;
}
Version version = storeRepo.getStore(Version.parseStoreFromKafkaTopicName(replica.getResource()))
.getVersion(Version.parseVersionFromKafkaTopicName(replica.getResource()));
int replicationFactor = version != null ? version.getReplicationFactor() : 3;
List<Instance> readyToServeInstances = customizedViewRepo.getReadyToServeInstances(
resourceAssn.getPartitionAssignment(replica.getResource()),
replica.getPartitionId());
int numReplicas = readyToServeInstances.size();
if (numReplicas < 2) {
nonStoppableInstances.put(instanceId, InstanceRemovableStatuses.NonStoppableReason.WILL_LOSE_DATA.name());
break;
} else {
if (numReplicas <= replicationFactor) {
nonStoppableInstances
.put(instanceId, InstanceRemovableStatuses.NonStoppableReason.MIN_ACTIVE_REPLICA_VIOLATION.name());
break;
}
// Check if other replicas are already counted as stoppable in earlier iteration, we cannot remove it.
for (Instance readyInstance: readyToServeInstances) {
if ((numReplicas <= replicationFactor + 1) && stoppableInstances.contains(readyInstance.getNodeId())) {
nonStoppableInstances
.put(instanceId, InstanceRemovableStatuses.NonStoppableReason.MIN_ACTIVE_REPLICA_VIOLATION.name());
break;
}
}
}
}
if (!nonStoppableInstances.containsKey(instanceId)) {
stoppableInstances.add(instanceId);
}
}

return statuses;
}

/**
* Calculate number of partition for given store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3771,6 +3771,14 @@ public TopicManager getTopicManager(String pubSubServerAddress) {
return getVeniceHelixAdmin().getTopicManager(pubSubServerAddress);
}

@Override
public InstanceRemovableStatuses getInstanceRemovableStatuses(
String cluster,
Set<String> instances,
List<String> toBeStoppedInstances) {
throw new VeniceException("getReplicasOfStorageNode is not supported!");
}

/**
* @see VeniceHelixAdmin#isLeaderControllerFor(String)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static com.linkedin.venice.controllerapi.ControllerRoute.CHECK_RESOURCE_CLEANUP_FOR_STORE_CREATION;
import static com.linkedin.venice.controllerapi.ControllerRoute.CLEANUP_INSTANCE_CUSTOMIZED_STATES;
import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_DISCOVERY;
import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_HEALTH_STATUS;
import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_HEALTH_STORES;
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE;
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPLETE_MIGRATION;
Expand Down Expand Up @@ -408,6 +409,9 @@ public boolean startInner() throws Exception {
httpService.post(
SEND_HEARTBEAT_TIMESTAMP_TO_SYSTEM_STORE.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.sendHeartbeatToSystemStore(admin)));
httpService.get(
CLUSTER_HEALTH_STATUS.getPath(),
new VeniceParentControllerRegionStateHandler(admin, controllerRoutes.getClusterStoppableInstanceStatus(admin)));
httpService.get(
GET_HEARTBEAT_TIMESTAMP_FROM_SYSTEM_STORE.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getHeartbeatFromSystemStore(admin)));
Expand Down
Loading

0 comments on commit fde0153

Please sign in to comment.