Skip to content

Commit

Permalink
[controller] Merge VeniceControllerConfig and VeniceControllerCluster…
Browse files Browse the repository at this point in the history
…Config
  • Loading branch information
nisargthakkar committed Jul 17, 2024
1 parent 979fd05 commit 58e92c4
Show file tree
Hide file tree
Showing 13 changed files with 683 additions and 724 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private ConfigKeys() {

// Controller specific configs
public static final String CONTROLLER_CLUSTER_ZK_ADDRESSS = "controller.cluster.zk.address";
/** Cluster name for all parent controllers */
// Name of the Helix cluster for controllers
public static final String CONTROLLER_CLUSTER = "controller.cluster.name";

/** List of forbidden admin paths */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class HelixVeniceClusterResources implements VeniceResource {
private HelixCustomizedViewOfflinePushRepository customizedViewRepo;
private final ReadWriteSchemaRepository schemaRepository;
private final HelixStatusMessageChannel messageChannel;
private final VeniceControllerClusterConfig config;
private final VeniceControllerConfig config;
private final PushMonitorDelegator pushMonitor;
private final LeakedPushStatusCleanUpService leakedPushStatusCleanUpService;
private final ZkRoutersClusterManager routersClusterManager;
Expand Down Expand Up @@ -367,7 +367,7 @@ public SafeHelixManager getHelixManager() {
return helixManager;
}

public VeniceControllerClusterConfig getConfig() {
public VeniceControllerConfig getConfig() {
return config;
}

Expand Down

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class VeniceControllerMultiClusterConfig {
public VeniceControllerMultiClusterConfig(Collection<VeniceProperties> controllerClusterProperties) {
clusterToControllerConfigMap = new HashMap<>();
for (VeniceProperties properties: controllerClusterProperties) {
final VeniceControllerConfig controllerClusterConfig = new VeniceControllerConfig(properties);
clusterToControllerConfigMap.put(controllerClusterConfig.getClusterName(), controllerClusterConfig);
final VeniceControllerConfig controllerConfig = new VeniceControllerConfig(properties);
clusterToControllerConfigMap.put(controllerConfig.getClusterName(), controllerConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void createStore(
LOGGER.info("Start creating store {} in cluster {} with owner {}", storeName, clusterName, owner);
try (AutoCloseableLock ignore = clusterResources.getClusterLockManager().createStoreWriteLock(storeName)) {
checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, true);
VeniceControllerClusterConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
Store newStore = new ZKStore(
storeName,
owner,
Expand Down Expand Up @@ -996,7 +996,7 @@ public void createStore(
}
}

private void configureNewStore(Store newStore, VeniceControllerClusterConfig config, int largestUsedVersionNumber) {
private void configureNewStore(Store newStore, VeniceControllerConfig config, int largestUsedVersionNumber) {
newStore.setNativeReplicationEnabled(config.isNativeReplicationEnabledAsDefaultForBatchOnly());
newStore.setActiveActiveReplicationEnabled(
config.isActiveActiveReplicationEnabledAsDefaultForBatchOnly() && !newStore.isSystemStore());
Expand Down Expand Up @@ -1579,21 +1579,21 @@ private List<Version> getVersionsToMigrate(
public Map<String, ControllerClient> getControllerClientMap(String clusterName) {
return clusterControllerClientPerColoMap.computeIfAbsent(clusterName, cn -> {
Map<String, ControllerClient> controllerClients = new HashMap<>();
VeniceControllerConfig veniceControllerConfig = multiClusterConfigs.getControllerConfig(clusterName);
veniceControllerConfig.getChildDataCenterControllerUrlMap()
VeniceControllerConfig controllerConfig = multiClusterConfigs.getControllerConfig(clusterName);
controllerConfig.getChildDataCenterControllerUrlMap()
.entrySet()
.forEach(
entry -> controllerClients.put(
entry.getKey(),
ControllerClient.constructClusterControllerClient(clusterName, entry.getValue(), sslFactory)));

veniceControllerConfig.getChildDataCenterControllerD2Map()
controllerConfig.getChildDataCenterControllerD2Map()
.entrySet()
.forEach(
entry -> controllerClients.put(
entry.getKey(),
new D2ControllerClient(
veniceControllerConfig.getD2ServiceName(),
controllerConfig.getD2ServiceName(),
clusterName,
entry.getValue(),
sslFactory)));
Expand Down Expand Up @@ -2081,7 +2081,7 @@ public Version addVersionOnly(
version.setPushType(pushType);
store.addVersion(version);
// Apply cluster-level native replication configs
VeniceControllerClusterConfig clusterConfig = resources.getConfig();
VeniceControllerConfig clusterConfig = resources.getConfig();

boolean nativeReplicationEnabled = version.isNativeReplicationEnabled();

Expand Down Expand Up @@ -2180,15 +2180,15 @@ public void createSpecificVersionTopic(String clusterName, String storeName, Ver
}
checkControllerLeadershipFor(clusterName);
try {
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig controllerConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int amplificationFactor = version.getPartitionerConfig().getAmplificationFactor();
topicToCreationTime.computeIfAbsent(version.kafkaTopicName(), topic -> System.currentTimeMillis());
createBatchTopics(
version,
version.getPushType(),
getTopicManager(),
version.getPartitionCount() * amplificationFactor,
clusterConfig,
controllerConfig,
false);
} finally {
topicToCreationTime.remove(version.kafkaTopicName());
Expand Down Expand Up @@ -2301,7 +2301,7 @@ private void createBatchTopics(
PushType pushType,
TopicManager topicManager,
int partitionCount,
VeniceControllerClusterConfig clusterConfig,
VeniceControllerConfig clusterConfig,
boolean useFastKafkaOperationTimeout) {
List<PubSubTopic> topicNamesToCreate = new ArrayList<>(2);
topicNamesToCreate.add(pubSubTopicRepository.getTopic(version.kafkaTopicName()));
Expand Down Expand Up @@ -2457,7 +2457,7 @@ private Pair<Boolean, Version> addVersion(
OfflinePushStrategy offlinePushStrategy;
int currentVersionBeforePush = -1;
boolean isRepush = Version.isPushIdRePush(pushJobId);
VeniceControllerClusterConfig clusterConfig = resources.getConfig();
VeniceControllerConfig clusterConfig = resources.getConfig();
BackupStrategy backupStrategy;

try {
Expand Down Expand Up @@ -2865,7 +2865,7 @@ public Version incrementVersionIdempotent(
"Request of creating versions/topics for targeted region push should only be sent to parent controller");
}
checkControllerLeadershipFor(clusterName);
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int replicationMetadataVersionId = clusterConfig.getReplicationMetadataVersion();
return pushType.isIncremental()
? getIncrementalPushVersion(clusterName, storeName)
Expand Down Expand Up @@ -2997,7 +2997,7 @@ public String getRealTimeTopic(String clusterName, String storeName) {
}
}

VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
getTopicManager().createTopic(
realTimeTopic,
partitionCount,
Expand Down Expand Up @@ -3456,7 +3456,7 @@ public void retireOldStoreVersions(
@Override
public void topicCleanupWhenPushComplete(String clusterName, String storeName, int versionNumber) {
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);
VeniceControllerClusterConfig clusterConfig = resources.getConfig();
VeniceControllerConfig clusterConfig = resources.getConfig();
ReadWriteStoreRepository storeRepository = resources.getStoreMetadataRepository();
Store store = storeRepository.getStore(storeName);
if (store.isHybrid() && clusterConfig.isKafkaLogCompactionForHybridStoresEnabled()) {
Expand Down Expand Up @@ -4024,7 +4024,7 @@ public void setStoreOwner(String clusterName, String storeName, String owner) {
*/
@Override
public void setStorePartitionCount(String clusterName, String storeName, int partitionCount) {
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
storeMetadataUpdate(clusterName, storeName, store -> {
preCheckStorePartitionCountUpdate(clusterName, store, partitionCount);
// Do not update the partitionCount on the store.version as version config is immutable. The
Expand All @@ -4043,7 +4043,7 @@ public void setStorePartitionCount(String clusterName, String storeName, int par

void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newPartitionCount) {
String errorMessagePrefix = "Store update error for " + store.getName() + " in cluster: " + clusterName + ": ";
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
if (store.isHybrid() && store.getPartitionCount() != newPartitionCount) {
// Allow the update if partition count is not configured and the new partition count matches RT partition count
if (store.getPartitionCount() == 0) {
Expand Down Expand Up @@ -4261,7 +4261,7 @@ private void setRmdChunkingEnabled(String clusterName, String storeName, boolean

void setIncrementalPushEnabled(String clusterName, String storeName, boolean incrementalPushEnabled) {
storeMetadataUpdate(clusterName, storeName, store -> {
VeniceControllerClusterConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
if (incrementalPushEnabled || store.isHybrid()) {
// Enabling incremental push
store.setNativeReplicationEnabled(config.isNativeReplicationEnabledAsDefaultForHybrid());
Expand Down Expand Up @@ -4644,7 +4644,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);
ZkRoutersClusterManager routersClusterManager = resources.getRoutersClusterManager();
int routerCount = routersClusterManager.getLiveRoutersCount();
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int defaultReadQuotaPerRouter = clusterConfig.getDefaultReadQuotaPerRouter();

if (Math.max(defaultReadQuotaPerRouter, routerCount * defaultReadQuotaPerRouter) < readQuotaInCU.get()) {
Expand All @@ -4667,7 +4667,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
setBootstrapToOnlineTimeoutInHours(clusterName, storeName, bootstrapToOnlineTimeoutInHours.get());
}

VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
if (newHybridStoreConfig.isPresent()) {
// To fix the final variable problem in the lambda expression
final HybridStoreConfig finalHybridConfig = newHybridStoreConfig.get();
Expand Down Expand Up @@ -5301,7 +5301,7 @@ public SchemaEntry getValueSchema(String clusterName, String storeName, int id)
}

private void validateValueSchemaUsingRandomGenerator(String schemaStr, String clusterName, String storeName) {
VeniceControllerClusterConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
if (!config.isControllerSchemaValidationEnabled()) {
return;
}
Expand Down Expand Up @@ -6100,7 +6100,7 @@ private void createClusterIfRequired(String clusterName) {
return;
}

VeniceControllerClusterConfig config = multiClusterConfigs.getControllerConfig(clusterName);
VeniceControllerConfig config = multiClusterConfigs.getControllerConfig(clusterName);
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
Map<String, String> helixClusterProperties = new HashMap<>();
Expand Down Expand Up @@ -6336,7 +6336,7 @@ public int calculateNumberOfPartitions(String clusterName, String storeName) {
checkControllerLeadershipFor(clusterName);
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);
Store store = resources.getStoreMetadataRepository().getStoreOrThrow(storeName);
VeniceControllerClusterConfig config = resources.getConfig();
VeniceControllerConfig config = resources.getConfig();
return PartitionUtils.calculatePartitionCount(
storeName,
store.getStorageQuotaInByte(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,11 +1042,11 @@ private int getRmdVersionID(final String storeName, final String clusterName) {
return store.getRmdVersion();
}

final VeniceControllerConfig controllerClusterConfig = getMultiClusterConfigs().getControllerConfig(clusterName);
if (controllerClusterConfig == null) {
final VeniceControllerConfig controllerConfig = getMultiClusterConfigs().getControllerConfig(clusterName);
if (controllerConfig == null) {
throw new VeniceException("No controller cluster config found for cluster " + clusterName);
}
final int rmdVersionID = controllerClusterConfig.getReplicationMetadataVersion();
final int rmdVersionID = controllerConfig.getReplicationMetadataVersion();
LOGGER.info("Use RMD version ID {} for cluster {}", rmdVersionID, clusterName);
return rmdVersionID;
}
Expand Down Expand Up @@ -2399,8 +2399,8 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
hybridDataReplicationPolicy,
hybridBufferReplayPolicy);

// Get VeniceControllerClusterConfig for the cluster
VeniceControllerClusterConfig clusterConfig =
// Get VeniceControllerConfig for the cluster
VeniceControllerConfig controllerConfig =
veniceHelixAdmin.getHelixVeniceClusterResources(clusterName).getConfig();
// Check if the store is being converted to a hybrid store
boolean storeBeingConvertedToHybrid = !currStore.isHybrid() && updatedHybridStoreConfig != null
Expand All @@ -2426,7 +2426,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
// Enable active-active replication automatically when batch user store being converted to hybrid store and
// active-active replication is enabled for all hybrid store via the cluster config
if (storeBeingConvertedToHybrid && !setStore.activeActiveReplicationEnabled && !currStore.isSystemStore()
&& clusterConfig.isActiveActiveReplicationEnabledAsDefaultForHybrid()) {
&& controllerConfig.isActiveActiveReplicationEnabledAsDefaultForHybrid()) {
setStore.activeActiveReplicationEnabled = true;
updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED);
if (!hybridDataReplicationPolicy.isPresent()) {
Expand All @@ -2453,7 +2453,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
// replication is enabled or being and the cluster config allows it.
if (!setStore.incrementalPushEnabled && !currStore.isSystemStore() && storeBeingConvertedToHybrid
&& setStore.activeActiveReplicationEnabled
&& clusterConfig.enabledIncrementalPushForHybridActiveActiveUserStores()) {
&& controllerConfig.enabledIncrementalPushForHybridActiveActiveUserStores()) {
setStore.incrementalPushEnabled = true;
updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED);
}
Expand Down Expand Up @@ -2682,8 +2682,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
if (!getVeniceHelixAdmin().isHybrid(currStore.getHybridStoreConfig())
&& getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.getPartitionNum() == 0) {
// This is a new hybrid store and partition count is not specified.
VeniceControllerClusterConfig config =
getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName).getConfig();
VeniceControllerConfig config = getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName).getConfig();
setStore.setPartitionNum(
PartitionUtils.calculatePartitionCount(
storeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void createVeniceStorageClusterResources(
LeaderStandbySMD.name,
IdealState.RebalanceMode.FULL_AUTO.toString(),
AutoRebalanceStrategy.class.getName());
VeniceControllerClusterConfig config = multiClusterConfigs.getControllerConfig(clusterName);
VeniceControllerConfig config = multiClusterConfigs.getControllerConfig(clusterName);
IdealState idealState = helixAdmin.getResourceIdealState(clusterName, kafkaTopic);
// We don't set the delayed time per resource, we will use the cluster level helix config to decide
// the delayed rebalance time
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.controller.util;

import com.linkedin.venice.controller.VeniceControllerClusterConfig;
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.controller.VeniceParentHelixAdmin;
import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -45,7 +45,7 @@ public static boolean checkAndMaybeApplyPartialUpdateConfig(
UpdateStore setStore,
boolean storeBeingConvertedToHybrid) {
Store currentStore = parentHelixAdmin.getVeniceHelixAdmin().getStore(clusterName, storeName);
VeniceControllerClusterConfig clusterConfig =
VeniceControllerConfig controllerConfig =
parentHelixAdmin.getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName).getConfig();
boolean partialUpdateConfigChanged = false;
setStore.writeComputationEnabled = currentStore.isWriteComputationEnabled();
Expand All @@ -69,8 +69,8 @@ public static boolean checkAndMaybeApplyPartialUpdateConfig(
*/
final boolean shouldEnablePartialUpdateBasedOnClusterConfig =
storeBeingConvertedToHybrid && (setStore.activeActiveReplicationEnabled
? clusterConfig.isEnablePartialUpdateForHybridActiveActiveUserStores()
: clusterConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores());
? controllerConfig.isEnablePartialUpdateForHybridActiveActiveUserStores()
: controllerConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores());
if (!currentStore.isWriteComputationEnabled() && shouldEnablePartialUpdateBasedOnClusterConfig) {
LOGGER.info("Controller will try to enable partial update based on cluster config for store: " + storeName);
/**
Expand Down
Loading

0 comments on commit 58e92c4

Please sign in to comment.