Skip to content

Commit

Permalink
Retain VeniceControllerClusterConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Jul 19, 2024
1 parent 58e92c4 commit 2b174d7
Show file tree
Hide file tree
Showing 35 changed files with 320 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class AbstractTestVeniceHelixAdmin {
VeniceHelixAdmin veniceAdmin;
String clusterName;
String storeOwner = "Doge of Venice";
VeniceControllerConfig controllerConfig;
VeniceControllerClusterConfig clusterConfig;
String zkAddress;
ZkServerWrapper zkServerWrapper;
PubSubBrokerWrapper pubSubBrokerWrapper;
Expand Down Expand Up @@ -104,8 +104,8 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri
properties.put(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, true);
controllerProps = new VeniceProperties(properties);
helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), clusterName);
controllerConfig = new VeniceControllerConfig(controllerProps);
multiClusterConfig = TestUtils.getMultiClusterConfigFromOneCluster(controllerConfig);
clusterConfig = new VeniceControllerClusterConfig(controllerProps);
multiClusterConfig = TestUtils.getMultiClusterConfigFromOneCluster(clusterConfig);
veniceAdmin = new VeniceHelixAdmin(
multiClusterConfig,
metricsRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public void testControllerFailOver() throws Exception {
veniceAdmin.createStore(clusterName, storeName, "dev", KEY_SCHEMA, VALUE_SCHEMA);
Version version =
veniceAdmin.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), 1, 1);
int newAdminPort = controllerConfig.getAdminPort() + 1; /* Note: this is a dummy port */
int newAdminPort = clusterConfig.getAdminPort() + 1; /* Note: this is a dummy port */
PropertyBuilder builder = new PropertyBuilder().put(controllerProps.toProperties()).put("admin.port", newAdminPort);
VeniceProperties newControllerProps = builder.build();
VeniceControllerConfig newConfig = new VeniceControllerConfig(newControllerProps);
VeniceControllerClusterConfig newConfig = new VeniceControllerClusterConfig(newControllerProps);
VeniceHelixAdmin newAdmin = new VeniceHelixAdmin(
TestUtils.getMultiClusterConfigFromOneCluster(newConfig),
new MetricsRepository(),
Expand Down Expand Up @@ -308,12 +308,12 @@ public void testIsInstanceRemovableForRunningPush() throws Exception {
public void testGetLeaderController() {
Assert.assertEquals(
veniceAdmin.getLeaderController(clusterName).getNodeId(),
Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), controllerConfig.getAdminPort()));
Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), clusterConfig.getAdminPort()));
// Create a new controller and test getLeaderController again.
int newAdminPort = controllerConfig.getAdminPort() - 10;
int newAdminPort = clusterConfig.getAdminPort() - 10;
PropertyBuilder builder = new PropertyBuilder().put(controllerProps.toProperties()).put("admin.port", newAdminPort);
VeniceProperties newControllerProps = builder.build();
VeniceControllerConfig newConfig = new VeniceControllerConfig(newControllerProps);
VeniceControllerClusterConfig newConfig = new VeniceControllerClusterConfig(newControllerProps);
VeniceHelixAdmin newLeaderAdmin = new VeniceHelixAdmin(
TestUtils.getMultiClusterConfigFromOneCluster(newConfig),
new MetricsRepository(),
Expand All @@ -328,18 +328,18 @@ public void testGetLeaderController() {
if (veniceAdmin.isLeaderControllerFor(clusterName)) {
Assert.assertEquals(
veniceAdmin.getLeaderController(clusterName).getNodeId(),
Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), controllerConfig.getAdminPort()));
Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), clusterConfig.getAdminPort()));
} else {
Assert.assertEquals(
veniceAdmin.getLeaderController(clusterName).getNodeId(),
Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), newAdminPort));
Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), newAdminPort));
}
newLeaderAdmin.stop(clusterName);
admins.remove(newLeaderAdmin);
waitForALeader(admins, clusterName, LEADER_CHANGE_TIMEOUT_MS);
Assert.assertEquals(
veniceAdmin.getLeaderController(clusterName).getNodeId(),
Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), controllerConfig.getAdminPort()),
Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), clusterConfig.getAdminPort()),
"Controller should be back to original one.");
veniceAdmin.stop(clusterName);
TestUtils.waitForNonDeterministicCompletion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ public void testIsLeaderController() {
veniceAdmin.isLeaderControllerFor(clusterName),
"The default controller should be the leader controller.");

int newAdminPort = controllerConfig.getAdminPort() + 1; /* Note: dummy port */
int newAdminPort = clusterConfig.getAdminPort() + 1; /* Note: dummy port */
PropertyBuilder builder = new PropertyBuilder().put(controllerProps.toProperties()).put("admin.port", newAdminPort);

VeniceProperties newControllerProps = builder.build();
VeniceControllerConfig newConfig = new VeniceControllerConfig(newControllerProps);
VeniceControllerClusterConfig newConfig = new VeniceControllerClusterConfig(newControllerProps);
VeniceHelixAdmin newLeaderAdmin = new VeniceHelixAdmin(
TestUtils.getMultiClusterConfigFromOneCluster(newConfig),
new MetricsRepository(),
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testMultiCluster() {
new PropertyBuilder().put(controllerProps.toProperties()).put("cluster.name", newClusterName);

VeniceProperties newClusterProps = builder.build();
VeniceControllerConfig newClusterConfig = new VeniceControllerConfig(newClusterProps);
VeniceControllerClusterConfig newClusterConfig = new VeniceControllerClusterConfig(newClusterProps);
veniceAdmin.addConfig(newClusterConfig);
veniceAdmin.initStorageCluster(newClusterName);
waitUntilIsLeader(veniceAdmin, newClusterName, LEADER_CHANGE_TIMEOUT_MS);
Expand All @@ -194,9 +194,9 @@ public void testMultiCluster() {

@Test(timeOut = TOTAL_TIMEOUT_FOR_SHORT_TEST_MS)
public void testGetNumberOfPartition() {
long partitionSize = controllerConfig.getPartitionSize();
int maxPartitionNumber = controllerConfig.getMaxNumberOfPartitions();
int minPartitionNumber = controllerConfig.getMinNumberOfPartitions();
long partitionSize = clusterConfig.getPartitionSize();
int maxPartitionNumber = clusterConfig.getMaxNumberOfPartitions();
int minPartitionNumber = clusterConfig.getMinNumberOfPartitions();
String storeName = Utils.getUniqueString("test");

veniceAdmin.createStore(clusterName, storeName, "dev", KEY_SCHEMA, VALUE_SCHEMA);
Expand Down Expand Up @@ -236,9 +236,9 @@ public void testGetNumberOfPartition() {

@Test(timeOut = TOTAL_TIMEOUT_FOR_SHORT_TEST_MS)
public void testGetNumberOfPartitionsFromStoreLevelConfig() {
long partitionSize = controllerConfig.getPartitionSize();
int maxPartitionNumber = controllerConfig.getMaxNumberOfPartitions();
int minPartitionNumber = controllerConfig.getMinNumberOfPartitions();
long partitionSize = clusterConfig.getPartitionSize();
int maxPartitionNumber = clusterConfig.getMaxNumberOfPartitions();
int minPartitionNumber = clusterConfig.getMinNumberOfPartitions();
String storeName = Utils.getUniqueString("test");

veniceAdmin.createStore(clusterName, storeName, "dev", KEY_SCHEMA, VALUE_SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ private HelixVeniceClusterResources getVeniceHelixResources(String cluster, Metr
.getReadOnlyZKSharedSystemStoreRepository();
doReturn(mock(HelixReadOnlyZKSharedSchemaRepository.class)).when(veniceHelixAdmin)
.getReadOnlyZKSharedSchemaRepository();
VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class);
when(controllerConfig.getDaVinciPushStatusScanThreadNumber()).thenReturn(4);
when(controllerConfig.getDaVinciPushStatusScanIntervalInSeconds()).thenReturn(5);
when(controllerConfig.isDaVinciPushStatusEnabled()).thenReturn(true);
when(controllerConfig.getOffLineJobWaitTimeInMilliseconds()).thenReturn(120000L);
VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class);
when(clusterConfig.getDaVinciPushStatusScanThreadNumber()).thenReturn(4);
when(clusterConfig.getDaVinciPushStatusScanIntervalInSeconds()).thenReturn(5);
when(clusterConfig.isDaVinciPushStatusEnabled()).thenReturn(true);
when(clusterConfig.getOffLineJobWaitTimeInMilliseconds()).thenReturn(120000L);
return new HelixVeniceClusterResources(
cluster,
zkClient,
new HelixAdapterSerializer(),
new SafeHelixManager(controller),
mock(VeniceControllerConfig.class),
mock(VeniceControllerClusterConfig.class),
veniceHelixAdmin,
metricsRepository,
mock(RealTimeTopicSwitcher.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.controller.VeniceControllerClusterConfig;
import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
Expand Down Expand Up @@ -545,9 +545,9 @@ public static OffsetRecord getOffsetRecord(long currentOffset, Optional<Long> en
}

public static VeniceControllerMultiClusterConfig getMultiClusterConfigFromOneCluster(
VeniceControllerConfig controllerConfig) {
Map<String, VeniceControllerConfig> configMap = new HashMap<>();
configMap.put(controllerConfig.getClusterName(), controllerConfig);
VeniceControllerClusterConfig clusterConfig) {
Map<String, VeniceControllerClusterConfig> configMap = new HashMap<>();
configMap.put(clusterConfig.getClusterName(), clusterConfig);
return new VeniceControllerMultiClusterConfig(configMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class HelixVeniceClusterResources implements VeniceResource {
private HelixCustomizedViewOfflinePushRepository customizedViewRepo;
private final ReadWriteSchemaRepository schemaRepository;
private final HelixStatusMessageChannel messageChannel;
private final VeniceControllerConfig config;
private final VeniceControllerClusterConfig clusterConfig;
private final PushMonitorDelegator pushMonitor;
private final LeakedPushStatusCleanUpService leakedPushStatusCleanUpService;
private final ZkRoutersClusterManager routersClusterManager;
Expand All @@ -84,20 +84,20 @@ public HelixVeniceClusterResources(
ZkClient zkClient,
HelixAdapterSerializer adapterSerializer,
SafeHelixManager helixManager,
VeniceControllerConfig config,
VeniceControllerClusterConfig clusterConfig,
VeniceHelixAdmin admin,
MetricsRepository metricsRepository,
RealTimeTopicSwitcher realTimeTopicSwitcher,
Optional<DynamicAccessController> accessController,
HelixAdminClient helixAdminClient) {
this.clusterName = clusterName;
this.config = config;
this.clusterConfig = clusterConfig;
this.helixManager = helixManager;
this.admin = admin;
/**
* So far, Meta system store doesn't support write from parent cluster.
*/
if (!config.isParent()) {
if (!clusterConfig.isParent()) {
metaStoreWriter = Optional.of(admin.getMetaStoreWriter());
} else {
metaStoreWriter = Optional.empty();
Expand Down Expand Up @@ -142,16 +142,16 @@ public HelixVeniceClusterResources(
this.messageChannel = new HelixStatusMessageChannel(
helixManager,
new HelixMessageChannelStats(metricsRepository, clusterName),
config.getHelixSendMessageTimeoutMs());
clusterConfig.getHelixSendMessageTimeoutMs());
VeniceOfflinePushMonitorAccessor offlinePushMonitorAccessor = new VeniceOfflinePushMonitorAccessor(
clusterName,
zkClient,
adapterSerializer,
config.getRefreshAttemptsForZkReconnect(),
config.getRefreshIntervalForZkReconnectInMs());
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
String aggregateRealTimeSourceKafkaUrl =
config.getChildDataCenterKafkaUrlMap().get(config.getAggregateRealTimeSourceRegion());
boolean unregisterMetricEnabled = config.isUnregisterMetricForDeletedStoreEnabled();
clusterConfig.getChildDataCenterKafkaUrlMap().get(clusterConfig.getAggregateRealTimeSourceRegion());
boolean unregisterMetricEnabled = clusterConfig.isUnregisterMetricForDeletedStoreEnabled();

this.pushMonitor = new PushMonitorDelegator(
clusterName,
Expand All @@ -163,9 +163,9 @@ public HelixVeniceClusterResources(
realTimeTopicSwitcher,
clusterLockManager,
aggregateRealTimeSourceKafkaUrl,
getActiveActiveRealTimeSourceKafkaURLs(config),
getActiveActiveRealTimeSourceKafkaURLs(clusterConfig),
helixAdminClient,
config,
clusterConfig,
admin.getPushStatusStoreReader(),
admin.getDisabledPartitionStats(clusterName));

Expand All @@ -175,16 +175,16 @@ public HelixVeniceClusterResources(
storeMetadataRepository,
admin,
new AggPushStatusCleanUpStats(clusterName, metricsRepository, storeMetadataRepository, unregisterMetricEnabled),
this.config.getLeakedPushStatusCleanUpServiceSleepIntervalInMs(),
this.config.getLeakedResourceAllowedLingerTimeInMs());
this.clusterConfig.getLeakedPushStatusCleanUpServiceSleepIntervalInMs(),
this.clusterConfig.getLeakedResourceAllowedLingerTimeInMs());
// On controller side, router cluster manager is used as an accessor without maintaining any cache, so do not need
// to refresh once zk reconnected.
this.routersClusterManager = new ZkRoutersClusterManager(
zkClient,
adapterSerializer,
clusterName,
config.getRefreshAttemptsForZkReconnect(),
config.getRefreshIntervalForZkReconnectInMs());
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
this.aggPartitionHealthStats = new AggPartitionHealthStats(
clusterName,
metricsRepository,
Expand All @@ -193,32 +193,32 @@ public HelixVeniceClusterResources(
pushMonitor);
this.storeConfigAccessor = new ZkStoreConfigAccessor(zkClient, adapterSerializer, metaStoreWriter);
this.accessController = accessController;
if (config.getErrorPartitionAutoResetLimit() > 0) {
if (clusterConfig.getErrorPartitionAutoResetLimit() > 0) {
errorPartitionResetTask = new ErrorPartitionResetTask(
clusterName,
helixAdminClient,
storeMetadataRepository,
routingDataRepository,
pushMonitor,
metricsRepository,
config.getErrorPartitionAutoResetLimit(),
config.getErrorPartitionProcessingCycleDelay());
clusterConfig.getErrorPartitionAutoResetLimit(),
clusterConfig.getErrorPartitionProcessingCycleDelay());
}
veniceAdminStats = new VeniceAdminStats(metricsRepository, "venice-admin-" + clusterName);
this.storagePersonaRepository =
new StoragePersonaRepository(clusterName, this.storeMetadataRepository, adapterSerializer, zkClient);
}

private List<String> getActiveActiveRealTimeSourceKafkaURLs(VeniceControllerConfig config) {
List<String> kafkaURLs = new ArrayList<>(config.getActiveActiveRealTimeSourceFabrics().size());
for (String fabric: config.getActiveActiveRealTimeSourceFabrics()) {
String kafkaURL = config.getChildDataCenterKafkaUrlMap().get(fabric);
private List<String> getActiveActiveRealTimeSourceKafkaURLs(VeniceControllerClusterConfig clusterConfig) {
List<String> kafkaURLs = new ArrayList<>(clusterConfig.getActiveActiveRealTimeSourceFabrics().size());
for (String fabric: clusterConfig.getActiveActiveRealTimeSourceFabrics()) {
String kafkaURL = clusterConfig.getChildDataCenterKafkaUrlMap().get(fabric);
if (kafkaURL == null) {
throw new VeniceException(
String.format(
"No A/A source Kafka URL found for fabric %s in %s",
fabric,
config.getChildDataCenterKafkaUrlMap()));
clusterConfig.getChildDataCenterKafkaUrlMap()));
}
kafkaURLs.add(kafkaURL);
}
Expand All @@ -241,12 +241,12 @@ private void repairStoreReplicationFactor(ReadWriteStoreRepository metadataRepos
*/
if (store.getReplicationFactor() <= 0) {
int previousReplicationFactor = store.getReplicationFactor();
store.setReplicationFactor(config.getReplicationFactor());
store.setReplicationFactor(clusterConfig.getReplicationFactor());
metadataRepository.updateStore(store);
LOGGER.info(
"Updated replication factor from {} to {} for store: {}, in cluster: {}",
previousReplicationFactor,
config.getReplicationFactor(),
clusterConfig.getReplicationFactor(),
store.getName(),
clusterName);
}
Expand Down Expand Up @@ -367,8 +367,8 @@ public SafeHelixManager getHelixManager() {
return helixManager;
}

public VeniceControllerConfig getConfig() {
return config;
public VeniceControllerClusterConfig getConfig() {
return clusterConfig;
}

public PushMonitorDelegator getPushMonitor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void run() {
time.sleep((long) sleepIntervalBetweenListFetchMinutes * Time.MS_PER_MINUTE);
// loop all clusters
for (String clusterName: multiClusterConfig.getClusters()) {
VeniceControllerConfig clusterConfig = multiClusterConfig.getControllerConfig(clusterName);
VeniceControllerClusterConfig clusterConfig = multiClusterConfig.getControllerConfig(clusterName);
boolean cleanupEnabled = clusterConfig.isStoreGraveyardCleanupEnabled();
int delayInMinutes = clusterConfig.getStoreGraveyardCleanupDelayMinutes();
if (!cleanupEnabled || !admin.isLeaderControllerFor(clusterName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ public UserSystemStoreLifeCycleHelper(
this.parentAdmin = parentAdmin;
this.authorizerService = authorizerService;
for (String cluster: multiClusterConfig.getClusters()) {
VeniceControllerConfig controllerConfig = multiClusterConfig.getControllerConfig(cluster);
VeniceControllerClusterConfig clusterConfig = multiClusterConfig.getControllerConfig(cluster);
Set<VeniceSystemStoreType> autoCreateEnabledSystemStores = new HashSet<>();
if (controllerConfig.isZkSharedMetaSystemSchemaStoreAutoCreationEnabled()
&& controllerConfig.isAutoMaterializeMetaSystemStoreEnabled()) {
if (clusterConfig.isZkSharedMetaSystemSchemaStoreAutoCreationEnabled()
&& clusterConfig.isAutoMaterializeMetaSystemStoreEnabled()) {
autoCreateEnabledSystemStores.add(VeniceSystemStoreType.META_STORE);
}
if (controllerConfig.isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled()
&& controllerConfig.isAutoMaterializeDaVinciPushStatusSystemStoreEnabled()) {
if (clusterConfig.isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled()
&& clusterConfig.isAutoMaterializeDaVinciPushStatusSystemStoreEnabled()) {
autoCreateEnabledSystemStores.add(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE);
}
clusterToAutoCreateEnabledSystemStoresMap.put(cluster, autoCreateEnabledSystemStores);
Expand Down
Loading

0 comments on commit 2b174d7

Please sign in to comment.