From 2b174d7cae3eff9a22c14cd3d3f197462b52d56b Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Fri, 19 Jul 2024 11:36:54 -0700 Subject: [PATCH] Retain VeniceControllerClusterConfig --- .../AbstractTestVeniceHelixAdmin.java | 6 +- ...niceHelixAdminWithIsolatedEnvironment.java | 16 +-- ...VeniceHelixAdminWithSharedEnvironment.java | 18 +-- .../controller/TestVeniceHelixResources.java | 12 +- .../com/linkedin/venice/utils/TestUtils.java | 8 +- .../HelixVeniceClusterResources.java | 54 ++++---- .../StoreGraveyardCleanupService.java | 2 +- .../UserSystemStoreLifeCycleHelper.java | 10 +- .../venice/controller/VeniceController.java | 3 +- ...ava => VeniceControllerClusterConfig.java} | 6 +- .../VeniceControllerMultiClusterConfig.java | 16 +-- .../controller/VeniceControllerService.java | 2 +- .../VeniceControllerStateModel.java | 2 +- .../venice/controller/VeniceHelixAdmin.java | 130 +++++++++--------- .../controller/VeniceParentHelixAdmin.java | 42 +++--- .../venice/controller/ZkHelixAdminClient.java | 4 +- .../kafka/consumer/AdminConsumerService.java | 44 +++--- .../ParentControllerConfigUpdateUtils.java | 8 +- .../pushmonitor/AbstractPushMonitor.java | 24 ++-- .../PartitionStatusBasedPushMonitor.java | 6 +- .../pushmonitor/PushMonitorDelegator.java | 6 +- .../AbstractTestVeniceParentHelixAdmin.java | 42 +++--- .../TestDisabledPartitionEnablerService.java | 6 +- .../TestStoreBackupVersionCleanupService.java | 16 +-- .../TestUnusedValueSchemaCleanupService.java | 10 +- ...=> TestVeniceControllerClusterConfig.java} | 16 +-- .../TestVeniceHelixAdminWithoutCluster.java | 6 +- .../TestVeniceParentHelixAdmin.java | 48 ++++--- .../SystemStoreInitializationHelperTest.java | 12 +- .../kafka/TestTopicCleanupService.java | 14 +- ...icCleanupServiceForMultiKafkaClusters.java | 8 +- ...opicCleanupServiceForParentController.java | 8 +- .../consumer/TestAdminConsumerService.java | 8 +- ...ParentControllerConfigUpdateUtilsTest.java | 14 +- .../pushmonitor/AbstractPushMonitorTest.java | 8 +- 35 files changed, 320 insertions(+), 315 deletions(-) rename services/venice-controller/src/main/java/com/linkedin/venice/controller/{VeniceControllerConfig.java => VeniceControllerClusterConfig.java} (99%) rename services/venice-controller/src/test/java/com/linkedin/venice/controller/{TestVeniceControllerConfig.java => TestVeniceControllerClusterConfig.java} (83%) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java index 74bc22e3f6..2612654ed7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java @@ -68,7 +68,7 @@ class AbstractTestVeniceHelixAdmin { VeniceHelixAdmin veniceAdmin; String clusterName; String storeOwner = "Doge of Venice"; - VeniceControllerConfig controllerConfig; + VeniceControllerClusterConfig clusterConfig; String zkAddress; ZkServerWrapper zkServerWrapper; PubSubBrokerWrapper pubSubBrokerWrapper; @@ -104,8 +104,8 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri properties.put(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, true); controllerProps = new VeniceProperties(properties); helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), clusterName); - controllerConfig = new VeniceControllerConfig(controllerProps); - multiClusterConfig = TestUtils.getMultiClusterConfigFromOneCluster(controllerConfig); + clusterConfig = new VeniceControllerClusterConfig(controllerProps); + multiClusterConfig = TestUtils.getMultiClusterConfigFromOneCluster(clusterConfig); veniceAdmin = new VeniceHelixAdmin( multiClusterConfig, metricsRepository, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java index fe066fc172..f9c40a579c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java @@ -56,10 +56,10 @@ public void testControllerFailOver() throws Exception { veniceAdmin.createStore(clusterName, storeName, "dev", KEY_SCHEMA, VALUE_SCHEMA); Version version = veniceAdmin.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), 1, 1); - int newAdminPort = controllerConfig.getAdminPort() + 1; /* Note: this is a dummy port */ + int newAdminPort = clusterConfig.getAdminPort() + 1; /* Note: this is a dummy port */ PropertyBuilder builder = new PropertyBuilder().put(controllerProps.toProperties()).put("admin.port", newAdminPort); VeniceProperties newControllerProps = builder.build(); - VeniceControllerConfig newConfig = new VeniceControllerConfig(newControllerProps); + VeniceControllerClusterConfig newConfig = new VeniceControllerClusterConfig(newControllerProps); VeniceHelixAdmin newAdmin = new VeniceHelixAdmin( TestUtils.getMultiClusterConfigFromOneCluster(newConfig), new MetricsRepository(), @@ -308,12 +308,12 @@ public void testIsInstanceRemovableForRunningPush() throws Exception { public void testGetLeaderController() { Assert.assertEquals( veniceAdmin.getLeaderController(clusterName).getNodeId(), - Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), controllerConfig.getAdminPort())); + Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), clusterConfig.getAdminPort())); // Create a new controller and test getLeaderController again. - int newAdminPort = controllerConfig.getAdminPort() - 10; + int newAdminPort = clusterConfig.getAdminPort() - 10; PropertyBuilder builder = new PropertyBuilder().put(controllerProps.toProperties()).put("admin.port", newAdminPort); VeniceProperties newControllerProps = builder.build(); - VeniceControllerConfig newConfig = new VeniceControllerConfig(newControllerProps); + VeniceControllerClusterConfig newConfig = new VeniceControllerClusterConfig(newControllerProps); VeniceHelixAdmin newLeaderAdmin = new VeniceHelixAdmin( TestUtils.getMultiClusterConfigFromOneCluster(newConfig), new MetricsRepository(), @@ -328,18 +328,18 @@ public void testGetLeaderController() { if (veniceAdmin.isLeaderControllerFor(clusterName)) { Assert.assertEquals( veniceAdmin.getLeaderController(clusterName).getNodeId(), - Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), controllerConfig.getAdminPort())); + Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), clusterConfig.getAdminPort())); } else { Assert.assertEquals( veniceAdmin.getLeaderController(clusterName).getNodeId(), - Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), newAdminPort)); + Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), newAdminPort)); } newLeaderAdmin.stop(clusterName); admins.remove(newLeaderAdmin); waitForALeader(admins, clusterName, LEADER_CHANGE_TIMEOUT_MS); Assert.assertEquals( veniceAdmin.getLeaderController(clusterName).getNodeId(), - Utils.getHelixNodeIdentifier(controllerConfig.getAdminHostname(), controllerConfig.getAdminPort()), + Utils.getHelixNodeIdentifier(clusterConfig.getAdminHostname(), clusterConfig.getAdminPort()), "Controller should be back to original one."); veniceAdmin.stop(clusterName); TestUtils.waitForNonDeterministicCompletion( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java index 71cf2426e1..6540356c50 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java @@ -139,11 +139,11 @@ public void testIsLeaderController() { veniceAdmin.isLeaderControllerFor(clusterName), "The default controller should be the leader controller."); - int newAdminPort = controllerConfig.getAdminPort() + 1; /* Note: dummy port */ + int newAdminPort = clusterConfig.getAdminPort() + 1; /* Note: dummy port */ PropertyBuilder builder = new PropertyBuilder().put(controllerProps.toProperties()).put("admin.port", newAdminPort); VeniceProperties newControllerProps = builder.build(); - VeniceControllerConfig newConfig = new VeniceControllerConfig(newControllerProps); + VeniceControllerClusterConfig newConfig = new VeniceControllerClusterConfig(newControllerProps); VeniceHelixAdmin newLeaderAdmin = new VeniceHelixAdmin( TestUtils.getMultiClusterConfigFromOneCluster(newConfig), new MetricsRepository(), @@ -183,7 +183,7 @@ public void testMultiCluster() { new PropertyBuilder().put(controllerProps.toProperties()).put("cluster.name", newClusterName); VeniceProperties newClusterProps = builder.build(); - VeniceControllerConfig newClusterConfig = new VeniceControllerConfig(newClusterProps); + VeniceControllerClusterConfig newClusterConfig = new VeniceControllerClusterConfig(newClusterProps); veniceAdmin.addConfig(newClusterConfig); veniceAdmin.initStorageCluster(newClusterName); waitUntilIsLeader(veniceAdmin, newClusterName, LEADER_CHANGE_TIMEOUT_MS); @@ -194,9 +194,9 @@ public void testMultiCluster() { @Test(timeOut = TOTAL_TIMEOUT_FOR_SHORT_TEST_MS) public void testGetNumberOfPartition() { - long partitionSize = controllerConfig.getPartitionSize(); - int maxPartitionNumber = controllerConfig.getMaxNumberOfPartitions(); - int minPartitionNumber = controllerConfig.getMinNumberOfPartitions(); + long partitionSize = clusterConfig.getPartitionSize(); + int maxPartitionNumber = clusterConfig.getMaxNumberOfPartitions(); + int minPartitionNumber = clusterConfig.getMinNumberOfPartitions(); String storeName = Utils.getUniqueString("test"); veniceAdmin.createStore(clusterName, storeName, "dev", KEY_SCHEMA, VALUE_SCHEMA); @@ -236,9 +236,9 @@ public void testGetNumberOfPartition() { @Test(timeOut = TOTAL_TIMEOUT_FOR_SHORT_TEST_MS) public void testGetNumberOfPartitionsFromStoreLevelConfig() { - long partitionSize = controllerConfig.getPartitionSize(); - int maxPartitionNumber = controllerConfig.getMaxNumberOfPartitions(); - int minPartitionNumber = controllerConfig.getMinNumberOfPartitions(); + long partitionSize = clusterConfig.getPartitionSize(); + int maxPartitionNumber = clusterConfig.getMaxNumberOfPartitions(); + int minPartitionNumber = clusterConfig.getMinNumberOfPartitions(); String storeName = Utils.getUniqueString("test"); veniceAdmin.createStore(clusterName, storeName, "dev", KEY_SCHEMA, VALUE_SCHEMA); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixResources.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixResources.java index d9d3a77e78..0b7f3aad22 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixResources.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixResources.java @@ -57,17 +57,17 @@ private HelixVeniceClusterResources getVeniceHelixResources(String cluster, Metr .getReadOnlyZKSharedSystemStoreRepository(); doReturn(mock(HelixReadOnlyZKSharedSchemaRepository.class)).when(veniceHelixAdmin) .getReadOnlyZKSharedSchemaRepository(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - when(controllerConfig.getDaVinciPushStatusScanThreadNumber()).thenReturn(4); - when(controllerConfig.getDaVinciPushStatusScanIntervalInSeconds()).thenReturn(5); - when(controllerConfig.isDaVinciPushStatusEnabled()).thenReturn(true); - when(controllerConfig.getOffLineJobWaitTimeInMilliseconds()).thenReturn(120000L); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + when(clusterConfig.getDaVinciPushStatusScanThreadNumber()).thenReturn(4); + when(clusterConfig.getDaVinciPushStatusScanIntervalInSeconds()).thenReturn(5); + when(clusterConfig.isDaVinciPushStatusEnabled()).thenReturn(true); + when(clusterConfig.getOffLineJobWaitTimeInMilliseconds()).thenReturn(120000L); return new HelixVeniceClusterResources( cluster, zkClient, new HelixAdapterSerializer(), new SafeHelixManager(controller), - mock(VeniceControllerConfig.class), + mock(VeniceControllerClusterConfig.class), veniceHelixAdmin, metricsRepository, mock(RealTimeTopicSwitcher.class), diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index 75fa68db6b..faddb302e4 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -31,7 +31,7 @@ import com.linkedin.venice.compression.NoopCompressor; import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.compression.ZstdWithDictCompressor; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; @@ -545,9 +545,9 @@ public static OffsetRecord getOffsetRecord(long currentOffset, Optional en } public static VeniceControllerMultiClusterConfig getMultiClusterConfigFromOneCluster( - VeniceControllerConfig controllerConfig) { - Map configMap = new HashMap<>(); - configMap.put(controllerConfig.getClusterName(), controllerConfig); + VeniceControllerClusterConfig clusterConfig) { + Map configMap = new HashMap<>(); + configMap.put(clusterConfig.getClusterName(), clusterConfig); return new VeniceControllerMultiClusterConfig(configMap); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java index 7e2e84cfec..1177185652 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java @@ -64,7 +64,7 @@ public class HelixVeniceClusterResources implements VeniceResource { private HelixCustomizedViewOfflinePushRepository customizedViewRepo; private final ReadWriteSchemaRepository schemaRepository; private final HelixStatusMessageChannel messageChannel; - private final VeniceControllerConfig config; + private final VeniceControllerClusterConfig clusterConfig; private final PushMonitorDelegator pushMonitor; private final LeakedPushStatusCleanUpService leakedPushStatusCleanUpService; private final ZkRoutersClusterManager routersClusterManager; @@ -84,20 +84,20 @@ public HelixVeniceClusterResources( ZkClient zkClient, HelixAdapterSerializer adapterSerializer, SafeHelixManager helixManager, - VeniceControllerConfig config, + VeniceControllerClusterConfig clusterConfig, VeniceHelixAdmin admin, MetricsRepository metricsRepository, RealTimeTopicSwitcher realTimeTopicSwitcher, Optional accessController, HelixAdminClient helixAdminClient) { this.clusterName = clusterName; - this.config = config; + this.clusterConfig = clusterConfig; this.helixManager = helixManager; this.admin = admin; /** * So far, Meta system store doesn't support write from parent cluster. */ - if (!config.isParent()) { + if (!clusterConfig.isParent()) { metaStoreWriter = Optional.of(admin.getMetaStoreWriter()); } else { metaStoreWriter = Optional.empty(); @@ -142,16 +142,16 @@ public HelixVeniceClusterResources( this.messageChannel = new HelixStatusMessageChannel( helixManager, new HelixMessageChannelStats(metricsRepository, clusterName), - config.getHelixSendMessageTimeoutMs()); + clusterConfig.getHelixSendMessageTimeoutMs()); VeniceOfflinePushMonitorAccessor offlinePushMonitorAccessor = new VeniceOfflinePushMonitorAccessor( clusterName, zkClient, adapterSerializer, - config.getRefreshAttemptsForZkReconnect(), - config.getRefreshIntervalForZkReconnectInMs()); + clusterConfig.getRefreshAttemptsForZkReconnect(), + clusterConfig.getRefreshIntervalForZkReconnectInMs()); String aggregateRealTimeSourceKafkaUrl = - config.getChildDataCenterKafkaUrlMap().get(config.getAggregateRealTimeSourceRegion()); - boolean unregisterMetricEnabled = config.isUnregisterMetricForDeletedStoreEnabled(); + clusterConfig.getChildDataCenterKafkaUrlMap().get(clusterConfig.getAggregateRealTimeSourceRegion()); + boolean unregisterMetricEnabled = clusterConfig.isUnregisterMetricForDeletedStoreEnabled(); this.pushMonitor = new PushMonitorDelegator( clusterName, @@ -163,9 +163,9 @@ public HelixVeniceClusterResources( realTimeTopicSwitcher, clusterLockManager, aggregateRealTimeSourceKafkaUrl, - getActiveActiveRealTimeSourceKafkaURLs(config), + getActiveActiveRealTimeSourceKafkaURLs(clusterConfig), helixAdminClient, - config, + clusterConfig, admin.getPushStatusStoreReader(), admin.getDisabledPartitionStats(clusterName)); @@ -175,16 +175,16 @@ public HelixVeniceClusterResources( storeMetadataRepository, admin, new AggPushStatusCleanUpStats(clusterName, metricsRepository, storeMetadataRepository, unregisterMetricEnabled), - this.config.getLeakedPushStatusCleanUpServiceSleepIntervalInMs(), - this.config.getLeakedResourceAllowedLingerTimeInMs()); + this.clusterConfig.getLeakedPushStatusCleanUpServiceSleepIntervalInMs(), + this.clusterConfig.getLeakedResourceAllowedLingerTimeInMs()); // On controller side, router cluster manager is used as an accessor without maintaining any cache, so do not need // to refresh once zk reconnected. this.routersClusterManager = new ZkRoutersClusterManager( zkClient, adapterSerializer, clusterName, - config.getRefreshAttemptsForZkReconnect(), - config.getRefreshIntervalForZkReconnectInMs()); + clusterConfig.getRefreshAttemptsForZkReconnect(), + clusterConfig.getRefreshIntervalForZkReconnectInMs()); this.aggPartitionHealthStats = new AggPartitionHealthStats( clusterName, metricsRepository, @@ -193,7 +193,7 @@ public HelixVeniceClusterResources( pushMonitor); this.storeConfigAccessor = new ZkStoreConfigAccessor(zkClient, adapterSerializer, metaStoreWriter); this.accessController = accessController; - if (config.getErrorPartitionAutoResetLimit() > 0) { + if (clusterConfig.getErrorPartitionAutoResetLimit() > 0) { errorPartitionResetTask = new ErrorPartitionResetTask( clusterName, helixAdminClient, @@ -201,24 +201,24 @@ public HelixVeniceClusterResources( routingDataRepository, pushMonitor, metricsRepository, - config.getErrorPartitionAutoResetLimit(), - config.getErrorPartitionProcessingCycleDelay()); + clusterConfig.getErrorPartitionAutoResetLimit(), + clusterConfig.getErrorPartitionProcessingCycleDelay()); } veniceAdminStats = new VeniceAdminStats(metricsRepository, "venice-admin-" + clusterName); this.storagePersonaRepository = new StoragePersonaRepository(clusterName, this.storeMetadataRepository, adapterSerializer, zkClient); } - private List getActiveActiveRealTimeSourceKafkaURLs(VeniceControllerConfig config) { - List kafkaURLs = new ArrayList<>(config.getActiveActiveRealTimeSourceFabrics().size()); - for (String fabric: config.getActiveActiveRealTimeSourceFabrics()) { - String kafkaURL = config.getChildDataCenterKafkaUrlMap().get(fabric); + private List getActiveActiveRealTimeSourceKafkaURLs(VeniceControllerClusterConfig clusterConfig) { + List kafkaURLs = new ArrayList<>(clusterConfig.getActiveActiveRealTimeSourceFabrics().size()); + for (String fabric: clusterConfig.getActiveActiveRealTimeSourceFabrics()) { + String kafkaURL = clusterConfig.getChildDataCenterKafkaUrlMap().get(fabric); if (kafkaURL == null) { throw new VeniceException( String.format( "No A/A source Kafka URL found for fabric %s in %s", fabric, - config.getChildDataCenterKafkaUrlMap())); + clusterConfig.getChildDataCenterKafkaUrlMap())); } kafkaURLs.add(kafkaURL); } @@ -241,12 +241,12 @@ private void repairStoreReplicationFactor(ReadWriteStoreRepository metadataRepos */ if (store.getReplicationFactor() <= 0) { int previousReplicationFactor = store.getReplicationFactor(); - store.setReplicationFactor(config.getReplicationFactor()); + store.setReplicationFactor(clusterConfig.getReplicationFactor()); metadataRepository.updateStore(store); LOGGER.info( "Updated replication factor from {} to {} for store: {}, in cluster: {}", previousReplicationFactor, - config.getReplicationFactor(), + clusterConfig.getReplicationFactor(), store.getName(), clusterName); } @@ -367,8 +367,8 @@ public SafeHelixManager getHelixManager() { return helixManager; } - public VeniceControllerConfig getConfig() { - return config; + public VeniceControllerClusterConfig getConfig() { + return clusterConfig; } public PushMonitorDelegator getPushMonitor() { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreGraveyardCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreGraveyardCleanupService.java index dab2358141..6275aa23fe 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreGraveyardCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreGraveyardCleanupService.java @@ -65,7 +65,7 @@ public void run() { time.sleep((long) sleepIntervalBetweenListFetchMinutes * Time.MS_PER_MINUTE); // loop all clusters for (String clusterName: multiClusterConfig.getClusters()) { - VeniceControllerConfig clusterConfig = multiClusterConfig.getControllerConfig(clusterName); + VeniceControllerClusterConfig clusterConfig = multiClusterConfig.getControllerConfig(clusterName); boolean cleanupEnabled = clusterConfig.isStoreGraveyardCleanupEnabled(); int delayInMinutes = clusterConfig.getStoreGraveyardCleanupDelayMinutes(); if (!cleanupEnabled || !admin.isLeaderControllerFor(clusterName)) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java index 037695f360..44468d0828 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java @@ -47,14 +47,14 @@ public UserSystemStoreLifeCycleHelper( this.parentAdmin = parentAdmin; this.authorizerService = authorizerService; for (String cluster: multiClusterConfig.getClusters()) { - VeniceControllerConfig controllerConfig = multiClusterConfig.getControllerConfig(cluster); + VeniceControllerClusterConfig clusterConfig = multiClusterConfig.getControllerConfig(cluster); Set autoCreateEnabledSystemStores = new HashSet<>(); - if (controllerConfig.isZkSharedMetaSystemSchemaStoreAutoCreationEnabled() - && controllerConfig.isAutoMaterializeMetaSystemStoreEnabled()) { + if (clusterConfig.isZkSharedMetaSystemSchemaStoreAutoCreationEnabled() + && clusterConfig.isAutoMaterializeMetaSystemStoreEnabled()) { autoCreateEnabledSystemStores.add(VeniceSystemStoreType.META_STORE); } - if (controllerConfig.isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled() - && controllerConfig.isAutoMaterializeDaVinciPushStatusSystemStoreEnabled()) { + if (clusterConfig.isZkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled() + && clusterConfig.isAutoMaterializeDaVinciPushStatusSystemStoreEnabled()) { autoCreateEnabledSystemStores.add(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE); } clusterToAutoCreateEnabledSystemStoresMap.put(cluster, autoCreateEnabledSystemStores); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java index 0308d8fda1..c3cb4d707b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java @@ -258,7 +258,8 @@ public void start() { private void initializeSystemSchema(Admin admin) { String systemStoreCluster = multiClusterConfigs.getSystemSchemaClusterName(); - VeniceControllerConfig systemStoreClusterConfig = multiClusterConfigs.getControllerConfig(systemStoreCluster); + VeniceControllerClusterConfig systemStoreClusterConfig = + multiClusterConfigs.getControllerConfig(systemStoreCluster); if (!multiClusterConfigs.isParent() && systemStoreClusterConfig.isSystemSchemaInitializationAtStartTimeEnabled()) { String regionName = systemStoreClusterConfig.getRegionName(); String childControllerUrl = systemStoreClusterConfig.getChildControllerUrl(regionName); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java similarity index 99% rename from services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java rename to services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index 1aa719708b..1a74b76d4a 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -223,8 +223,8 @@ /** * Configuration which is specific to a Venice controller. */ -public class VeniceControllerConfig { - private static final Logger LOGGER = LogManager.getLogger(VeniceControllerConfig.class); +public class VeniceControllerClusterConfig { + private static final Logger LOGGER = LogManager.getLogger(VeniceControllerClusterConfig.class); private static final String LIST_SEPARATOR = ",\\s*"; private final VeniceProperties props; @@ -573,7 +573,7 @@ public class VeniceControllerConfig { private final String childDatacenters; - public VeniceControllerConfig(VeniceProperties props) { + public VeniceControllerClusterConfig(VeniceProperties props) { this.props = props; this.clusterName = props.getString(CLUSTER_NAME); this.zkAddress = props.getString(ZOOKEEPER_ADDRESS); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java index 3b3b0f4fd3..e926c25f16 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java @@ -16,26 +16,26 @@ public class VeniceControllerMultiClusterConfig { - private final Map clusterToControllerConfigMap; + private final Map clusterToControllerConfigMap; public VeniceControllerMultiClusterConfig(Collection controllerClusterProperties) { clusterToControllerConfigMap = new HashMap<>(); for (VeniceProperties properties: controllerClusterProperties) { - final VeniceControllerConfig controllerConfig = new VeniceControllerConfig(properties); - clusterToControllerConfigMap.put(controllerConfig.getClusterName(), controllerConfig); + final VeniceControllerClusterConfig clusterConfig = new VeniceControllerClusterConfig(properties); + clusterToControllerConfigMap.put(clusterConfig.getClusterName(), clusterConfig); } } // This contructor is used for testing. - public VeniceControllerMultiClusterConfig(Map clusterToControllerConfigMap) { + public VeniceControllerMultiClusterConfig(Map clusterToControllerConfigMap) { this.clusterToControllerConfigMap = new HashMap<>(clusterToControllerConfigMap); } - public void addClusterConfig(VeniceControllerConfig controllerConfig) { - clusterToControllerConfigMap.put(controllerConfig.getClusterName(), controllerConfig); + public void addClusterConfig(VeniceControllerClusterConfig clusterConfig) { + clusterToControllerConfigMap.put(clusterConfig.getClusterName(), clusterConfig); } - public VeniceControllerConfig getControllerConfig(String clusterName) { + public VeniceControllerClusterConfig getControllerConfig(String clusterName) { if (clusterToControllerConfigMap.containsKey(clusterName)) { return clusterToControllerConfigMap.get(clusterName); } else { @@ -195,7 +195,7 @@ public int getParentControllerMaxErroredTopicNumToKeep() { return getCommonConfig().getParentControllerMaxErroredTopicNumToKeep(); } - public VeniceControllerConfig getCommonConfig() { + public VeniceControllerClusterConfig getCommonConfig() { return clusterToControllerConfigMap.values().iterator().next(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java index 2d102226f3..83483589d0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java @@ -133,7 +133,7 @@ public VeniceControllerService( * consumes the admin topics. */ String systemClusterName = multiClusterConfigs.getSystemSchemaClusterName(); - VeniceControllerConfig systemStoreClusterConfig = multiClusterConfigs.getControllerConfig(systemClusterName); + VeniceControllerClusterConfig systemStoreClusterConfig = multiClusterConfigs.getControllerConfig(systemClusterName); newSchemaEncountered = (schemaId, schema) -> { LOGGER.info("Encountered a new KME value schema (id = {}), proceed to register", schemaId); try { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java index ab9c05ee95..d4bb84adb2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java @@ -47,7 +47,7 @@ public class VeniceControllerStateModel extends StateModel { private final HelixAdminClient helixAdminClient; private final RealTimeTopicSwitcher realTimeTopicSwitcher; - private VeniceControllerConfig clusterConfig; + private VeniceControllerClusterConfig clusterConfig; private SafeHelixManager helixManager; private HelixVeniceClusterResources clusterResources; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 550dc99bdb..d9fd5618d6 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -459,7 +459,7 @@ public VeniceHelixAdmin( List additionalInitRoutines) { Validate.notNull(d2Client); this.multiClusterConfigs = multiClusterConfigs; - VeniceControllerConfig commonConfig = multiClusterConfigs.getCommonConfig(); + VeniceControllerClusterConfig commonConfig = multiClusterConfigs.getCommonConfig(); this.controllerName = Utils.getHelixNodeIdentifier(multiClusterConfigs.getAdminHostname(), multiClusterConfigs.getAdminPort()); this.controllerClusterName = multiClusterConfigs.getControllerClusterName(); @@ -701,28 +701,28 @@ public void handleDeletedInstances(Set deletedInstances) { } private VeniceProperties getPubSubSSLPropertiesFromControllerConfig(String pubSubBootstrapServers) { - VeniceControllerConfig controllerConfig = multiClusterConfigs.getCommonConfig(); + VeniceControllerClusterConfig clusterConfig = multiClusterConfigs.getCommonConfig(); - VeniceProperties originalPros = controllerConfig.getProps(); - Properties clonedProperties = originalPros.toProperties(); - if (originalPros.getBooleanWithAlternative(KAFKA_OVER_SSL, SSL_TO_KAFKA_LEGACY, false)) { + VeniceProperties originalProps = clusterConfig.getProps(); + Properties clonedProperties = originalProps.toProperties(); + if (originalProps.getBooleanWithAlternative(KAFKA_OVER_SSL, SSL_TO_KAFKA_LEGACY, false)) { clonedProperties.setProperty(SSL_KAFKA_BOOTSTRAP_SERVERS, pubSubBootstrapServers); } else { clonedProperties.setProperty(KAFKA_BOOTSTRAP_SERVERS, pubSubBootstrapServers); } - controllerConfig = new VeniceControllerConfig(new VeniceProperties(clonedProperties)); + clusterConfig = new VeniceControllerClusterConfig(new VeniceProperties(clonedProperties)); Properties properties = multiClusterConfigs.getCommonConfig().getProps().getPropertiesCopy(); - ApacheKafkaProducerConfig.copyKafkaSASLProperties(originalPros, properties, false); - if (KafkaSSLUtils.isKafkaSSLProtocol(controllerConfig.getKafkaSecurityProtocol())) { - Optional sslConfig = controllerConfig.getSslConfig(); + ApacheKafkaProducerConfig.copyKafkaSASLProperties(originalProps, properties, false); + if (KafkaSSLUtils.isKafkaSSLProtocol(clusterConfig.getKafkaSecurityProtocol())) { + Optional sslConfig = clusterConfig.getSslConfig(); if (!sslConfig.isPresent()) { throw new VeniceException("SSLConfig should be present when Kafka SSL is enabled"); } properties.putAll(sslConfig.get().getKafkaSSLConfig()); - properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, controllerConfig.getKafkaSecurityProtocol()); - properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, controllerConfig.getSslKafkaBootstrapServers()); + properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterConfig.getKafkaSecurityProtocol()); + properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, clusterConfig.getSslKafkaBootstrapServers()); } else { - properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, controllerConfig.getKafkaBootstrapServers()); + properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, clusterConfig.getKafkaBootstrapServers()); } return new VeniceProperties(properties); } @@ -938,16 +938,16 @@ public void createStore( LOGGER.info("Start creating store {} in cluster {} with owner {}", storeName, clusterName, owner); try (AutoCloseableLock ignore = clusterResources.getClusterLockManager().createStoreWriteLock(storeName)) { checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, true); - VeniceControllerConfig config = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); Store newStore = new ZKStore( storeName, owner, System.currentTimeMillis(), - config.getPersistenceType(), - config.getRoutingStrategy(), - config.getReadStrategy(), - config.getOfflinePushStrategy(), - config.getReplicationFactor()); + clusterConfig.getPersistenceType(), + clusterConfig.getRoutingStrategy(), + clusterConfig.getReadStrategy(), + clusterConfig.getOfflinePushStrategy(), + clusterConfig.getReplicationFactor()); ReadWriteStoreRepository storeRepo = clusterResources.getStoreMetadataRepository(); Store existingStore = storeRepo.getStore(storeName); @@ -974,7 +974,7 @@ public void createStore( storeName, largestUsedStoreVersion); } - configureNewStore(newStore, config, largestUsedStoreVersion); + configureNewStore(newStore, clusterConfig, largestUsedStoreVersion); storeRepo.addStore(newStore); // Create global config for that store. @@ -996,18 +996,21 @@ public void createStore( } } - private void configureNewStore(Store newStore, VeniceControllerConfig config, int largestUsedVersionNumber) { - newStore.setNativeReplicationEnabled(config.isNativeReplicationEnabledAsDefaultForBatchOnly()); + private void configureNewStore( + Store newStore, + VeniceControllerClusterConfig clusterConfig, + int largestUsedVersionNumber) { + newStore.setNativeReplicationEnabled(clusterConfig.isNativeReplicationEnabledAsDefaultForBatchOnly()); newStore.setActiveActiveReplicationEnabled( - config.isActiveActiveReplicationEnabledAsDefaultForBatchOnly() && !newStore.isSystemStore()); + clusterConfig.isActiveActiveReplicationEnabledAsDefaultForBatchOnly() && !newStore.isSystemStore()); /** * Initialize default NR source fabric base on default config for different store types. */ if (newStore.isHybrid()) { - newStore.setNativeReplicationSourceFabric(config.getNativeReplicationSourceFabricAsDefaultForHybrid()); + newStore.setNativeReplicationSourceFabric(clusterConfig.getNativeReplicationSourceFabricAsDefaultForHybrid()); } else { - newStore.setNativeReplicationSourceFabric(config.getNativeReplicationSourceFabricAsDefaultForBatchOnly()); + newStore.setNativeReplicationSourceFabric(clusterConfig.getNativeReplicationSourceFabricAsDefaultForBatchOnly()); } newStore.setLargestUsedVersionNumber(largestUsedVersionNumber); } @@ -1579,24 +1582,20 @@ private List getVersionsToMigrate( public Map getControllerClientMap(String clusterName) { return clusterControllerClientPerColoMap.computeIfAbsent(clusterName, cn -> { Map controllerClients = new HashMap<>(); - VeniceControllerConfig controllerConfig = multiClusterConfigs.getControllerConfig(clusterName); - controllerConfig.getChildDataCenterControllerUrlMap() + VeniceControllerClusterConfig clusterConfig = multiClusterConfigs.getControllerConfig(clusterName); + clusterConfig.getChildDataCenterControllerUrlMap() .entrySet() .forEach( entry -> controllerClients.put( entry.getKey(), ControllerClient.constructClusterControllerClient(clusterName, entry.getValue(), sslFactory))); - controllerConfig.getChildDataCenterControllerD2Map() + clusterConfig.getChildDataCenterControllerD2Map() .entrySet() .forEach( entry -> controllerClients.put( entry.getKey(), - new D2ControllerClient( - controllerConfig.getD2ServiceName(), - clusterName, - entry.getValue(), - sslFactory))); + new D2ControllerClient(clusterConfig.getD2ServiceName(), clusterName, entry.getValue(), sslFactory))); return controllerClients; }); @@ -2081,7 +2080,7 @@ public Version addVersionOnly( version.setPushType(pushType); store.addVersion(version); // Apply cluster-level native replication configs - VeniceControllerConfig clusterConfig = resources.getConfig(); + VeniceControllerClusterConfig clusterConfig = resources.getConfig(); boolean nativeReplicationEnabled = version.isNativeReplicationEnabled(); @@ -2180,7 +2179,7 @@ public void createSpecificVersionTopic(String clusterName, String storeName, Ver } checkControllerLeadershipFor(clusterName); try { - VeniceControllerConfig controllerConfig = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); int amplificationFactor = version.getPartitionerConfig().getAmplificationFactor(); topicToCreationTime.computeIfAbsent(version.kafkaTopicName(), topic -> System.currentTimeMillis()); createBatchTopics( @@ -2188,7 +2187,7 @@ public void createSpecificVersionTopic(String clusterName, String storeName, Ver version.getPushType(), getTopicManager(), version.getPartitionCount() * amplificationFactor, - controllerConfig, + clusterConfig, false); } finally { topicToCreationTime.remove(version.kafkaTopicName()); @@ -2301,7 +2300,7 @@ private void createBatchTopics( PushType pushType, TopicManager topicManager, int partitionCount, - VeniceControllerConfig clusterConfig, + VeniceControllerClusterConfig clusterConfig, boolean useFastKafkaOperationTimeout) { List topicNamesToCreate = new ArrayList<>(2); topicNamesToCreate.add(pubSubTopicRepository.getTopic(version.kafkaTopicName())); @@ -2457,7 +2456,7 @@ private Pair addVersion( OfflinePushStrategy offlinePushStrategy; int currentVersionBeforePush = -1; boolean isRepush = Version.isPushIdRePush(pushJobId); - VeniceControllerConfig clusterConfig = resources.getConfig(); + VeniceControllerClusterConfig clusterConfig = resources.getConfig(); BackupStrategy backupStrategy; try { @@ -2865,7 +2864,7 @@ public Version incrementVersionIdempotent( "Request of creating versions/topics for targeted region push should only be sent to parent controller"); } checkControllerLeadershipFor(clusterName); - VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); int replicationMetadataVersionId = clusterConfig.getReplicationMetadataVersion(); return pushType.isIncremental() ? getIncrementalPushVersion(clusterName, storeName) @@ -2997,7 +2996,7 @@ public String getRealTimeTopic(String clusterName, String storeName) { } } - VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); getTopicManager().createTopic( realTimeTopic, partitionCount, @@ -3152,7 +3151,8 @@ public RepushInfo getRepushInfo(String clusterName, String storeName, Optional { preCheckStorePartitionCountUpdate(clusterName, store, partitionCount); // Do not update the partitionCount on the store.version as version config is immutable. The @@ -4043,7 +4043,7 @@ public void setStorePartitionCount(String clusterName, String storeName, int par void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newPartitionCount) { String errorMessagePrefix = "Store update error for " + store.getName() + " in cluster: " + clusterName + ": "; - VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); if (store.isHybrid() && store.getPartitionCount() != newPartitionCount) { // Allow the update if partition count is not configured and the new partition count matches RT partition count if (store.getPartitionCount() == 0) { @@ -4261,22 +4261,22 @@ private void setRmdChunkingEnabled(String clusterName, String storeName, boolean void setIncrementalPushEnabled(String clusterName, String storeName, boolean incrementalPushEnabled) { storeMetadataUpdate(clusterName, storeName, store -> { - VeniceControllerConfig config = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); if (incrementalPushEnabled || store.isHybrid()) { // Enabling incremental push - store.setNativeReplicationEnabled(config.isNativeReplicationEnabledAsDefaultForHybrid()); - store.setNativeReplicationSourceFabric(config.getNativeReplicationSourceFabricAsDefaultForHybrid()); + store.setNativeReplicationEnabled(clusterConfig.isNativeReplicationEnabledAsDefaultForHybrid()); + store.setNativeReplicationSourceFabric(clusterConfig.getNativeReplicationSourceFabricAsDefaultForHybrid()); store.setActiveActiveReplicationEnabled( store.isActiveActiveReplicationEnabled() - || (config.isActiveActiveReplicationEnabledAsDefaultForHybrid() && !store.isSystemStore())); + || (clusterConfig.isActiveActiveReplicationEnabledAsDefaultForHybrid() && !store.isSystemStore())); } else { // Disabling incremental push // This is only possible when hybrid settings are set to null before turning of incremental push for the store. - store.setNativeReplicationEnabled(config.isNativeReplicationEnabledAsDefaultForBatchOnly()); - store.setNativeReplicationSourceFabric(config.getNativeReplicationSourceFabricAsDefaultForBatchOnly()); + store.setNativeReplicationEnabled(clusterConfig.isNativeReplicationEnabledAsDefaultForBatchOnly()); + store.setNativeReplicationSourceFabric(clusterConfig.getNativeReplicationSourceFabricAsDefaultForBatchOnly()); store.setActiveActiveReplicationEnabled( store.isActiveActiveReplicationEnabled() - || (config.isActiveActiveReplicationEnabledAsDefaultForBatchOnly() && !store.isSystemStore())); + || (clusterConfig.isActiveActiveReplicationEnabledAsDefaultForBatchOnly() && !store.isSystemStore())); } store.setIncrementalPushEnabled(incrementalPushEnabled); @@ -4644,7 +4644,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName); ZkRoutersClusterManager routersClusterManager = resources.getRoutersClusterManager(); int routerCount = routersClusterManager.getLiveRoutersCount(); - VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); int defaultReadQuotaPerRouter = clusterConfig.getDefaultReadQuotaPerRouter(); if (Math.max(defaultReadQuotaPerRouter, routerCount * defaultReadQuotaPerRouter) < readQuotaInCU.get()) { @@ -4667,7 +4667,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto setBootstrapToOnlineTimeoutInHours(clusterName, storeName, bootstrapToOnlineTimeoutInHours.get()); } - VeniceControllerConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); if (newHybridStoreConfig.isPresent()) { // To fix the final variable problem in the lambda expression final HybridStoreConfig finalHybridConfig = newHybridStoreConfig.get(); @@ -5301,8 +5301,8 @@ public SchemaEntry getValueSchema(String clusterName, String storeName, int id) } private void validateValueSchemaUsingRandomGenerator(String schemaStr, String clusterName, String storeName) { - VeniceControllerConfig config = getHelixVeniceClusterResources(clusterName).getConfig(); - if (!config.isControllerSchemaValidationEnabled()) { + VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); + if (!clusterConfig.isControllerSchemaValidationEnabled()) { return; } @@ -6100,12 +6100,12 @@ private void createClusterIfRequired(String clusterName) { return; } - VeniceControllerConfig config = multiClusterConfigs.getControllerConfig(clusterName); + VeniceControllerClusterConfig clusterConfig = multiClusterConfigs.getControllerConfig(clusterName); HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(clusterName).build(); Map helixClusterProperties = new HashMap<>(); helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true)); - long delayedTime = config.getDelayToRebalanceMS(); + long delayedTime = clusterConfig.getDelayToRebalanceMS(); if (delayedTime > 0) { helixClusterProperties .put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayedTime)); @@ -6336,16 +6336,16 @@ public int calculateNumberOfPartitions(String clusterName, String storeName) { checkControllerLeadershipFor(clusterName); HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName); Store store = resources.getStoreMetadataRepository().getStoreOrThrow(storeName); - VeniceControllerConfig config = resources.getConfig(); + VeniceControllerClusterConfig clusterConfig = resources.getConfig(); return PartitionUtils.calculatePartitionCount( storeName, store.getStorageQuotaInByte(), store.getPartitionCount(), - config.getPartitionSize(), - config.getMinNumberOfPartitions(), - config.getMaxNumberOfPartitions(), - config.isPartitionCountRoundUpEnabled(), - config.getPartitionCountRoundUpSize()); + clusterConfig.getPartitionSize(), + clusterConfig.getMinNumberOfPartitions(), + clusterConfig.getMaxNumberOfPartitions(), + clusterConfig.isPartitionCountRoundUpEnabled(), + clusterConfig.getPartitionCountRoundUpSize()); } /** @@ -7058,8 +7058,8 @@ public HelixVeniceClusterResources getHelixVeniceClusterResources(String cluster return resources.get(); } - void addConfig(VeniceControllerConfig config) { - multiClusterConfigs.addClusterConfig(config); + void addConfig(VeniceControllerClusterConfig clusterConfig) { + multiClusterConfigs.addClusterConfig(clusterConfig); } String getControllerName() { @@ -7707,7 +7707,7 @@ public boolean isParent() { @Override public Map getChildDataCenterControllerUrlMap(String clusterName) { /** - * According to {@link VeniceControllerConfig#VeniceControllerConfig(VeniceProperties)}, the map is empty + * According to {@link VeniceControllerClusterConfig#VeniceControllerClusterConfig(VeniceProperties)}, the map is empty * if this is a child controller. */ return multiClusterConfigs.getControllerConfig(clusterName).getChildDataCenterControllerUrlMap(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 31a6220fa5..ad4d6b49b2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -437,13 +437,13 @@ public VeniceParentHelixAdmin( } } for (String cluster: this.multiClusterConfigs.getClusters()) { - VeniceControllerConfig config = this.multiClusterConfigs.getControllerConfig(cluster); + VeniceControllerClusterConfig clusterConfig = this.multiClusterConfigs.getControllerConfig(cluster); adminCommandExecutionTrackers.put( cluster, new AdminCommandExecutionTracker( - config.getClusterName(), + clusterConfig.getClusterName(), this.veniceHelixAdmin.getExecutionIdAccessor(), - this.veniceHelixAdmin.getControllerClientMap(config.getClusterName()))); + this.veniceHelixAdmin.getControllerClientMap(clusterConfig.getClusterName()))); perStoreAdminLocks.put(cluster, new ConcurrentHashMap<>()); perClusterAdminLocks.put(cluster, new ReentrantLock()); } @@ -1042,11 +1042,11 @@ private int getRmdVersionID(final String storeName, final String clusterName) { return store.getRmdVersion(); } - final VeniceControllerConfig controllerConfig = getMultiClusterConfigs().getControllerConfig(clusterName); - if (controllerConfig == null) { + final VeniceControllerClusterConfig clusterConfig = getMultiClusterConfigs().getControllerConfig(clusterName); + if (clusterConfig == null) { throw new VeniceException("No controller cluster config found for cluster " + clusterName); } - final int rmdVersionID = controllerConfig.getReplicationMetadataVersion(); + final int rmdVersionID = clusterConfig.getReplicationMetadataVersion(); LOGGER.info("Use RMD version ID {} for cluster {}", rmdVersionID, clusterName); return rmdVersionID; } @@ -1763,7 +1763,8 @@ public Map getCurrentVersionsForMultiColos(String clusterName, public RepushInfo getRepushInfo(String clusterName, String storeName, Optional fabricName) { Map controllerClients = getVeniceHelixAdmin().getControllerClientMap(clusterName); String systemSchemaClusterName = multiClusterConfigs.getSystemSchemaClusterName(); - VeniceControllerConfig systemSchemaClusterConfig = multiClusterConfigs.getControllerConfig(systemSchemaClusterName); + VeniceControllerClusterConfig systemSchemaClusterConfig = + multiClusterConfigs.getControllerConfig(systemSchemaClusterName); if (fabricName.isPresent()) { StoreResponse response = controllerClients.get(fabricName.get()).getStore(storeName); @@ -2399,8 +2400,8 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa hybridDataReplicationPolicy, hybridBufferReplayPolicy); - // Get VeniceControllerConfig for the cluster - VeniceControllerConfig controllerConfig = + // Get VeniceControllerClusterConfig for the cluster + VeniceControllerClusterConfig clusterConfig = veniceHelixAdmin.getHelixVeniceClusterResources(clusterName).getConfig(); // Check if the store is being converted to a hybrid store boolean storeBeingConvertedToHybrid = !currStore.isHybrid() && updatedHybridStoreConfig != null @@ -2426,7 +2427,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa // Enable active-active replication automatically when batch user store being converted to hybrid store and // active-active replication is enabled for all hybrid store via the cluster config if (storeBeingConvertedToHybrid && !setStore.activeActiveReplicationEnabled && !currStore.isSystemStore() - && controllerConfig.isActiveActiveReplicationEnabledAsDefaultForHybrid()) { + && clusterConfig.isActiveActiveReplicationEnabledAsDefaultForHybrid()) { setStore.activeActiveReplicationEnabled = true; updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED); if (!hybridDataReplicationPolicy.isPresent()) { @@ -2453,7 +2454,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa // replication is enabled or being and the cluster config allows it. if (!setStore.incrementalPushEnabled && !currStore.isSystemStore() && storeBeingConvertedToHybrid && setStore.activeActiveReplicationEnabled - && controllerConfig.enabledIncrementalPushForHybridActiveActiveUserStores()) { + && clusterConfig.enabledIncrementalPushForHybridActiveActiveUserStores()) { setStore.incrementalPushEnabled = true; updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED); } @@ -2682,17 +2683,16 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa if (!getVeniceHelixAdmin().isHybrid(currStore.getHybridStoreConfig()) && getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.getPartitionNum() == 0) { // This is a new hybrid store and partition count is not specified. - VeniceControllerConfig config = getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName).getConfig(); setStore.setPartitionNum( PartitionUtils.calculatePartitionCount( storeName, setStore.getStorageQuotaInByte(), 0, - config.getPartitionSize(), - config.getMinNumberOfPartitionsForHybrid(), - config.getMaxNumberOfPartitions(), - config.isPartitionCountRoundUpEnabled(), - config.getPartitionCountRoundUpSize())); + clusterConfig.getPartitionSize(), + clusterConfig.getMinNumberOfPartitionsForHybrid(), + clusterConfig.getMaxNumberOfPartitions(), + clusterConfig.isPartitionCountRoundUpEnabled(), + clusterConfig.getPartitionCountRoundUpSize())); LOGGER.info( "Enforcing default hybrid partition count:{} for a new hybrid store:{}.", setStore.getPartitionNum(), @@ -5275,13 +5275,13 @@ private ControllerClient getFabricBuildoutControllerClient(String clusterName, S ControllerClient value = newFabricControllerClientMap.computeIfAbsent(clusterName, cn -> new VeniceConcurrentHashMap<>()) .computeIfAbsent(fabric, f -> { - VeniceControllerConfig controllerConfig = multiClusterConfigs.getControllerConfig(clusterName); - String d2ZkHost = controllerConfig.getChildControllerD2ZkHost(fabric); - String d2ServiceName = controllerConfig.getD2ServiceName(); + VeniceControllerClusterConfig clusterConfig = multiClusterConfigs.getControllerConfig(clusterName); + String d2ZkHost = clusterConfig.getChildControllerD2ZkHost(fabric); + String d2ServiceName = clusterConfig.getD2ServiceName(); if (StringUtils.isNotBlank(d2ZkHost) && StringUtils.isNotBlank(d2ServiceName)) { return new D2ControllerClient(d2ServiceName, clusterName, d2ZkHost, sslFactory); } - String url = controllerConfig.getChildControllerUrl(fabric); + String url = clusterConfig.getChildControllerUrl(fabric); if (StringUtils.isNotBlank(url)) { return ControllerClient.constructClusterControllerClient(clusterName, url, sslFactory); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkHelixAdminClient.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkHelixAdminClient.java index a0bfb452eb..a5e4ce29d9 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkHelixAdminClient.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkHelixAdminClient.java @@ -250,13 +250,13 @@ public void createVeniceStorageClusterResources( LeaderStandbySMD.name, IdealState.RebalanceMode.FULL_AUTO.toString(), AutoRebalanceStrategy.class.getName()); - VeniceControllerConfig config = multiClusterConfigs.getControllerConfig(clusterName); + VeniceControllerClusterConfig clusterConfig = multiClusterConfigs.getControllerConfig(clusterName); IdealState idealState = helixAdmin.getResourceIdealState(clusterName, kafkaTopic); // We don't set the delayed time per resource, we will use the cluster level helix config to decide // the delayed rebalance time idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName()); idealState.setMinActiveReplicas(replicationFactor - 1); - idealState.setRebalanceStrategy(config.getHelixRebalanceAlg()); + idealState.setRebalanceStrategy(clusterConfig.getHelixRebalanceAlg()); helixAdmin.setResourceIdealState(clusterName, kafkaTopic, idealState); LOGGER.info("Enabled delayed re-balance for resource: {}", kafkaTopic); helixAdmin.rebalance(clusterName, kafkaTopic, replicationFactor); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java index 9fa13521cb..7cf29983c8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java @@ -5,7 +5,7 @@ import static com.linkedin.venice.ConfigKeys.KAFKA_ENABLE_AUTO_COMMIT_CONFIG; import com.linkedin.venice.controller.AdminTopicMetadataAccessor; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controller.ZkAdminTopicMetadataAccessor; import com.linkedin.venice.controller.stats.AdminConsumptionStats; @@ -30,7 +30,7 @@ public class AdminConsumerService extends AbstractVeniceService { private static final long WAITING_TIME_FOR_STOP_IN_MS = 5000; - private final VeniceControllerConfig config; + private final VeniceControllerClusterConfig clusterConfig; private final VeniceHelixAdmin admin; private final ZkAdminTopicMetadataAccessor adminTopicMetadataAccessor; private final PubSubConsumerAdapterFactory consumerFactory; @@ -48,21 +48,21 @@ public class AdminConsumerService extends AbstractVeniceService { public AdminConsumerService( VeniceHelixAdmin admin, - VeniceControllerConfig config, + VeniceControllerClusterConfig clusterConfig, MetricsRepository metricsRepository, PubSubTopicRepository pubSubTopicRepository, PubSubMessageDeserializer pubSubMessageDeserializer) { - this.config = config; + this.clusterConfig = clusterConfig; this.admin = admin; this.adminTopicMetadataAccessor = new ZkAdminTopicMetadataAccessor(admin.getZkClient(), admin.getAdapterSerializer()); this.metricsRepository = metricsRepository; - this.remoteConsumptionEnabled = config.isAdminTopicRemoteConsumptionEnabled(); + this.remoteConsumptionEnabled = clusterConfig.isAdminTopicRemoteConsumptionEnabled(); this.pubSubTopicRepository = pubSubTopicRepository; this.pubSubMessageDeserializer = pubSubMessageDeserializer; if (remoteConsumptionEnabled) { - String adminTopicSourceRegion = config.getAdminTopicSourceRegion(); - remoteKafkaServerUrl = Optional.of(config.getChildDataCenterKafkaUrlMap().get(adminTopicSourceRegion)); + String adminTopicSourceRegion = clusterConfig.getAdminTopicSourceRegion(); + remoteKafkaServerUrl = Optional.of(clusterConfig.getChildDataCenterKafkaUrlMap().get(adminTopicSourceRegion)); } else { remoteKafkaServerUrl = Optional.empty(); } @@ -72,7 +72,7 @@ public AdminConsumerService( @Override public boolean startInner() throws Exception { - String clusterName = config.getClusterName(); + String clusterName = clusterConfig.getClusterName(); consumerTask = getAdminConsumptionTaskForCluster(clusterName); consumerThread = threadFactory.newThread(consumerTask); consumerThread.start(); @@ -102,22 +102,22 @@ private AdminConsumptionTask getAdminConsumptionTaskForCluster(String clusterNam admin, adminTopicMetadataAccessor, admin.getExecutionIdAccessor(), - config.isParent(), + clusterConfig.isParent(), new AdminConsumptionStats(metricsRepository, clusterName + "-admin_consumption_task"), - config.getAdminTopicReplicationFactor(), - config.getMinInSyncReplicasAdminTopics(), - config.getAdminConsumptionCycleTimeoutMs(), - config.getAdminConsumptionMaxWorkerThreadPoolSize(), + clusterConfig.getAdminTopicReplicationFactor(), + clusterConfig.getMinInSyncReplicasAdminTopics(), + clusterConfig.getAdminConsumptionCycleTimeoutMs(), + clusterConfig.getAdminConsumptionMaxWorkerThreadPoolSize(), pubSubTopicRepository, pubSubMessageDeserializer, - config.getRegionName()); + clusterConfig.getRegionName()); } /** * Skip admin message with specified offset for the given cluster. */ public void setOffsetToSkip(String clusterName, long offset, boolean skipDIV) { - if (clusterName.equals(config.getClusterName())) { + if (clusterName.equals(clusterConfig.getClusterName())) { if (skipDIV) { consumerTask.skipMessageDIVWithOffset(offset); } else { @@ -125,7 +125,7 @@ public void setOffsetToSkip(String clusterName, long offset, boolean skipDIV) { } } else { throw new VeniceException( - "This AdminConsumptionService is for cluster " + config.getClusterName() + "This AdminConsumptionService is for cluster " + clusterConfig.getClusterName() + ". Cannot skip admin message with offset " + offset + " for cluster " + clusterName); } } @@ -136,11 +136,11 @@ public void setOffsetToSkip(String clusterName, long offset, boolean skipDIV) { * @return last succeeded execution id for the given cluster. */ public Long getLastSucceededExecutionIdInCluster(String clusterName) { - if (clusterName.equals(config.getClusterName())) { + if (clusterName.equals(clusterConfig.getClusterName())) { return consumerTask.getLastSucceededExecutionId(); } else { throw new VeniceException( - "This AdminConsumptionService is for cluster: " + config.getClusterName() + "This AdminConsumptionService is for cluster: " + clusterConfig.getClusterName() + ". Cannot get the last succeed execution Id for cluster: " + clusterName); } } @@ -174,11 +174,11 @@ public long getFailingOffset() { * @return cluster-level execution id, offset, and upstream offset in a child colo. */ public Map getAdminTopicMetadata(String clusterName) { - if (clusterName.equals(config.getClusterName())) { + if (clusterName.equals(clusterConfig.getClusterName())) { return adminTopicMetadataAccessor.getMetadata(clusterName); } else { throw new VeniceException( - "This AdminConsumptionService is for cluster: " + config.getClusterName() + "This AdminConsumptionService is for cluster: " + clusterConfig.getClusterName() + ". Cannot get the last succeed execution Id for cluster: " + clusterName); } } @@ -187,12 +187,12 @@ public Map getAdminTopicMetadata(String clusterName) { * Update cluster-level execution id, offset, and upstream offset in a child colo. */ public void updateAdminTopicMetadata(String clusterName, long executionId, long offset, long upstreamOffset) { - if (clusterName.equals(config.getClusterName())) { + if (clusterName.equals(clusterConfig.getClusterName())) { Map metadata = AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId); adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); } else { throw new VeniceException( - "This AdminConsumptionService is for cluster: " + config.getClusterName() + "This AdminConsumptionService is for cluster: " + clusterConfig.getClusterName() + ". Cannot get the last succeed execution Id for cluster: " + clusterName); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/ParentControllerConfigUpdateUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/ParentControllerConfigUpdateUtils.java index fc2252da1b..e8ee519dd7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/ParentControllerConfigUpdateUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/ParentControllerConfigUpdateUtils.java @@ -1,6 +1,6 @@ package com.linkedin.venice.controller.util; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceParentHelixAdmin; import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore; import com.linkedin.venice.exceptions.VeniceException; @@ -45,7 +45,7 @@ public static boolean checkAndMaybeApplyPartialUpdateConfig( UpdateStore setStore, boolean storeBeingConvertedToHybrid) { Store currentStore = parentHelixAdmin.getVeniceHelixAdmin().getStore(clusterName, storeName); - VeniceControllerConfig controllerConfig = + VeniceControllerClusterConfig clusterConfig = parentHelixAdmin.getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName).getConfig(); boolean partialUpdateConfigChanged = false; setStore.writeComputationEnabled = currentStore.isWriteComputationEnabled(); @@ -69,8 +69,8 @@ public static boolean checkAndMaybeApplyPartialUpdateConfig( */ final boolean shouldEnablePartialUpdateBasedOnClusterConfig = storeBeingConvertedToHybrid && (setStore.activeActiveReplicationEnabled - ? controllerConfig.isEnablePartialUpdateForHybridActiveActiveUserStores() - : controllerConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores()); + ? clusterConfig.isEnablePartialUpdateForHybridActiveActiveUserStores() + : clusterConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores()); if (!currentStore.isWriteComputationEnabled() && shouldEnablePartialUpdateBasedOnClusterConfig) { LOGGER.info("Controller will try to enable partial update based on cluster config for store: " + storeName); /** diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 49c6ff82d4..9e306d5e6c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -10,7 +10,7 @@ import static com.linkedin.venice.pushmonitor.OfflinePushStatus.HELIX_RESOURCE_NOT_CREATED; import com.linkedin.venice.controller.HelixAdminClient; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; @@ -101,7 +101,7 @@ public AbstractPushMonitor( String aggregateRealTimeSourceKafkaUrl, List activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, - VeniceControllerConfig controllerConfig, + VeniceControllerClusterConfig clusterConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats) { this.clusterName = clusterName; @@ -117,23 +117,23 @@ public AbstractPushMonitor( this.helixAdminClient = helixAdminClient; this.disabledPartitionStats = disabledPartitionStats; - this.disableErrorLeaderReplica = controllerConfig.isErrorLeaderReplicaFailOverEnabled(); + this.disableErrorLeaderReplica = clusterConfig.isErrorLeaderReplicaFailOverEnabled(); this.helixClientThrottler = new EventThrottler(10, "push_monitor_helix_client_throttler", false, EventThrottler.BLOCK_STRATEGY); - this.offlineJobResourceAssignmentWaitTimeInMilliseconds = controllerConfig.getOffLineJobWaitTimeInMilliseconds(); + this.offlineJobResourceAssignmentWaitTimeInMilliseconds = clusterConfig.getOffLineJobWaitTimeInMilliseconds(); this.pushStatusCollector = new PushStatusCollector( metadataRepository, pushStatusStoreReader, (topic) -> handleCompletedPush(topic), (topic, details) -> handleErrorPush(topic, details), - controllerConfig.isDaVinciPushStatusScanEnabled(), - controllerConfig.getDaVinciPushStatusScanIntervalInSeconds(), - controllerConfig.getDaVinciPushStatusScanThreadNumber(), - controllerConfig.getDaVinciPushStatusScanNoReportRetryMaxAttempt(), - controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceCount(), - controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(), - controllerConfig.useDaVinciSpecificExecutionStatusForError()); - this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled(); + clusterConfig.isDaVinciPushStatusScanEnabled(), + clusterConfig.getDaVinciPushStatusScanIntervalInSeconds(), + clusterConfig.getDaVinciPushStatusScanThreadNumber(), + clusterConfig.getDaVinciPushStatusScanNoReportRetryMaxAttempt(), + clusterConfig.getDaVinciPushStatusScanMaxOfflineInstanceCount(), + clusterConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(), + clusterConfig.useDaVinciSpecificExecutionStatusForError()); + this.isOfflinePushMonitorDaVinciPushStatusEnabled = clusterConfig.isDaVinciPushStatusEnabled(); pushStatusCollector.start(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java index b9191897b8..b6b6fb86e3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java @@ -1,7 +1,7 @@ package com.linkedin.venice.pushmonitor; import com.linkedin.venice.controller.HelixAdminClient; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher; import com.linkedin.venice.meta.Instance; @@ -36,7 +36,7 @@ public PartitionStatusBasedPushMonitor( String aggregateRealTimeSourceKafkaUrl, List childDataCenterKafkaUrls, HelixAdminClient helixAdminClient, - VeniceControllerConfig controllerConfig, + VeniceControllerClusterConfig clusterConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats) { super( @@ -51,7 +51,7 @@ public PartitionStatusBasedPushMonitor( aggregateRealTimeSourceKafkaUrl, childDataCenterKafkaUrls, helixAdminClient, - controllerConfig, + clusterConfig, pushStatusStoreReader, disabledPartitionStats); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java index ebc450ed7c..fdab0cb687 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java @@ -1,7 +1,7 @@ package com.linkedin.venice.pushmonitor; import com.linkedin.venice.controller.HelixAdminClient; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; @@ -55,7 +55,7 @@ public PushMonitorDelegator( String aggregateRealTimeSourceKafkaUrl, List activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, - VeniceControllerConfig controllerConfig, + VeniceControllerClusterConfig clusterConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats) { this.clusterName = clusterName; @@ -73,7 +73,7 @@ public PushMonitorDelegator( aggregateRealTimeSourceKafkaUrl, activeActiveRealTimeSourceKafkaURLs, helixAdminClient, - controllerConfig, + clusterConfig, pushStatusStoreReader, disabledPartitionStats); this.clusterLockManager = clusterLockManager; diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java index aded34a940..12f654d0c8 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java @@ -65,7 +65,7 @@ public class AbstractTestVeniceParentHelixAdmin { TopicManager topicManager; VeniceHelixAdmin internalAdmin; - VeniceControllerConfig config; + VeniceControllerClusterConfig clusterConfig; ZkClient zkClient; VeniceWriter veniceWriter; VeniceParentHelixAdmin parentAdmin = null; @@ -120,14 +120,14 @@ public void setupInternalMocks() { HelixReadWriteStoreRepository storeRepository = mock(HelixReadWriteStoreRepository.class); doReturn(store).when(storeRepository).getStore(any()); - config = mockConfig(clusterName); - doReturn(1).when(config).getReplicationMetadataVersion(); + clusterConfig = mockConfig(clusterName); + doReturn(1).when(clusterConfig).getReplicationMetadataVersion(); controllerClients .put(regionName, ControllerClient.constructClusterControllerClient(clusterName, "localhost", Optional.empty())); doReturn(controllerClients).when(internalAdmin).getControllerClientMap(any()); - resources = mockResources(config, clusterName); + resources = mockResources(clusterConfig, clusterName); doReturn(storeRepository).when(resources).getStoreMetadataRepository(); ZkRoutersClusterManager manager = mock(ZkRoutersClusterManager.class); doReturn(manager).when(resources).getRoutersClusterManager(); @@ -156,7 +156,7 @@ public void setupInternalMocks() { public void initializeParentAdmin(Optional authorizerService) { parentAdmin = new VeniceParentHelixAdmin( internalAdmin, - TestUtils.getMultiClusterConfigFromOneCluster(config), + TestUtils.getMultiClusterConfigFromOneCluster(clusterConfig), false, Optional.empty(), authorizerService); @@ -186,30 +186,30 @@ public void cleanupTestCase() { } } - VeniceControllerConfig mockConfig(String clusterName) { - VeniceControllerConfig config = mock(VeniceControllerConfig.class); - doReturn(clusterName).when(config).getClusterName(); - doReturn(KAFKA_REPLICA_FACTOR).when(config).getKafkaReplicationFactor(); - doReturn(KAFKA_REPLICA_FACTOR).when(config).getAdminTopicReplicationFactor(); - doReturn(10000).when(config).getParentControllerWaitingTimeForConsumptionMs(); - doReturn("fake_kafka_bootstrap_servers").when(config).getKafkaBootstrapServers(); + VeniceControllerClusterConfig mockConfig(String clusterName) { + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterName).when(clusterConfig).getClusterName(); + doReturn(KAFKA_REPLICA_FACTOR).when(clusterConfig).getKafkaReplicationFactor(); + doReturn(KAFKA_REPLICA_FACTOR).when(clusterConfig).getAdminTopicReplicationFactor(); + doReturn(10000).when(clusterConfig).getParentControllerWaitingTimeForConsumptionMs(); + doReturn("fake_kafka_bootstrap_servers").when(clusterConfig).getKafkaBootstrapServers(); // PushJobStatusStore and participant message store are disabled in this unit test by default because many // tests are using verify(veniceWriter).put(...) which could be unpredictable with async setup enabled. - doReturn("").when(config).getPushJobStatusStoreClusterName(); - doReturn(false).when(config).isParticipantMessageStoreEnabled(); + doReturn("").when(clusterConfig).getPushJobStatusStoreClusterName(); + doReturn(false).when(clusterConfig).isParticipantMessageStoreEnabled(); // Disable background threads that may interfere when we try to re-mock internalAdmin later in the tests. - doReturn(Long.MAX_VALUE).when(config).getTerminalStateTopicCheckerDelayMs(); + doReturn(Long.MAX_VALUE).when(clusterConfig).getTerminalStateTopicCheckerDelayMs(); Map childClusterMap = new HashMap<>(); childClusterMap.put(regionName, "localhost"); - doReturn(childClusterMap).when(config).getChildDataCenterControllerUrlMap(); - doReturn(MAX_PARTITION_NUM).when(config).getMaxNumberOfPartitions(); - doReturn(DefaultIdentityParser.class.getName()).when(config).getIdentityParserClassName(); - return config; + doReturn(childClusterMap).when(clusterConfig).getChildDataCenterControllerUrlMap(); + doReturn(MAX_PARTITION_NUM).when(clusterConfig).getMaxNumberOfPartitions(); + doReturn(DefaultIdentityParser.class.getName()).when(clusterConfig).getIdentityParserClassName(); + return clusterConfig; } - HelixVeniceClusterResources mockResources(VeniceControllerConfig config, String clusterName) { + HelixVeniceClusterResources mockResources(VeniceControllerClusterConfig clusterConfig, String clusterName) { HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class); - doReturn(config).when(resources).getConfig(); + doReturn(clusterConfig).when(resources).getConfig(); doReturn(resources).when(internalAdmin).getHelixVeniceClusterResources(clusterName); doReturn(clusterLockManager).when(resources).getClusterLockManager(); return resources; diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDisabledPartitionEnablerService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDisabledPartitionEnablerService.java index ef4f99fd80..b13dfd0468 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDisabledPartitionEnablerService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDisabledPartitionEnablerService.java @@ -22,12 +22,12 @@ public void testCleanupBackupVersion() throws Exception { VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class); long defaultRetentionMs = TimeUnit.DAYS.toMillis(7); doReturn(defaultRetentionMs).when(config).getBackupVersionDefaultRetentionMs(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(config).getControllerConfig(anyString()); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(config).getControllerConfig(anyString()); Set clusters = new HashSet<>(); doReturn(true).when(admin).isLeaderControllerFor(any()); clusters.add(clusterName); - doReturn(true).when(controllerConfig).isEnableDisabledReplicaEnabled(); + doReturn(true).when(clusterConfig).isEnableDisabledReplicaEnabled(); doReturn(clusters).when(config).getClusters(); TestMockTime time = new TestMockTime(); DisabledPartitionEnablerService service = new DisabledPartitionEnablerService(admin, config, time); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java index 0baa868534..4b382818b8 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java @@ -163,8 +163,8 @@ public void testCleanupBackupVersion() { VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class); long defaultRetentionMs = TimeUnit.DAYS.toMillis(7); doReturn(defaultRetentionMs).when(config).getBackupVersionDefaultRetentionMs(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(config).getControllerConfig(anyString()); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(config).getControllerConfig(anyString()); doReturn(mockClusterResource).when(admin).getHelixVeniceClusterResources(anyString()); doReturn(clusterManager).when(mockClusterResource).getRoutersClusterManager(); StoreBackupVersionCleanupService service = @@ -223,9 +223,9 @@ public void testMetadataBasedCleanupBackupVersion() throws IOException { doReturn(new ByteArrayInputStream(OBJECT_MAPPER.writeValueAsBytes(currentVersionResponse))).when(entity) .getContent(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(config).getControllerConfig(anyString()); - doReturn(true).when(controllerConfig).isBackupVersionMetadataFetchBasedCleanupEnabled(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(config).getControllerConfig(anyString()); + doReturn(true).when(clusterConfig).isBackupVersionMetadataFetchBasedCleanupEnabled(); doReturn(mockClusterResource).when(admin).getHelixVeniceClusterResources(anyString()); doReturn(clusterManager).when(mockClusterResource).getRoutersClusterManager(); Set instSet = new HashSet<>(); @@ -267,9 +267,9 @@ public void testCleanupBackupVersionSleepValidation() throws Exception { LiveInstanceMonitor liveInstanceMonitor = mock(LiveInstanceMonitor.class); doReturn(defaultRetentionMs).when(config).getBackupVersionDefaultRetentionMs(); doReturn(defaultRetentionMs).when(config).getBackupVersionDefaultRetentionMs(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(config).getControllerConfig(any()); - doReturn(true).when(controllerConfig).isBackupVersionRetentionBasedCleanupEnabled(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(config).getControllerConfig(any()); + doReturn(true).when(clusterConfig).isBackupVersionRetentionBasedCleanupEnabled(); doReturn(true).when(admin).isLeaderControllerFor(any()); doReturn(liveInstanceMonitor).when(admin).getLiveInstanceMonitor(anyString()); doReturn(mockClusterResource).when(admin).getHelixVeniceClusterResources(anyString()); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java index fc1119e5bb..3aaa84a0c2 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java @@ -45,10 +45,10 @@ public void testCleanupUnusedSchema() throws Exception { VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class); doReturn(1).when(config).getUnusedSchemaCleanupIntervalSeconds(); doReturn(1).when(config).getMinSchemaCountToKeep(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(config).getControllerConfig(any()); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(config).getControllerConfig(any()); doReturn(true).when(parentHelixAdmin).isLeaderControllerFor(anyString()); - doReturn(true).when(controllerConfig).isUnusedValueSchemaCleanupServiceEnabled(); + doReturn(true).when(clusterConfig).isUnusedValueSchemaCleanupServiceEnabled(); doReturn(true).when(parentHelixAdmin).isLeaderControllerFor(any()); ReadWriteSchemaRepository schemaRepository = mock(ReadWriteSchemaRepository.class); HelixVeniceClusterResources clusterResources = mock(HelixVeniceClusterResources.class); @@ -93,8 +93,8 @@ public void testCleanupUnusedSchema() throws Exception { void testGetUnusedSchema() { VeniceParentHelixAdmin parentHelixAdmin = mock(VeniceParentHelixAdmin.class); VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(true).when(controllerConfig).isUnusedValueSchemaCleanupServiceEnabled(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(true).when(clusterConfig).isUnusedValueSchemaCleanupServiceEnabled(); doReturn(true).when(parentHelixAdmin).isLeaderControllerFor(any()); Store store = mockStore(); ReadWriteSchemaRepository schemaRepository = mock(ReadWriteSchemaRepository.class); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceControllerConfig.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceControllerClusterConfig.java similarity index 83% rename from services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceControllerConfig.java rename to services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceControllerClusterConfig.java index eb8a411efa..28633252d1 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceControllerConfig.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceControllerClusterConfig.java @@ -12,7 +12,7 @@ import org.testng.annotations.Test; -public class TestVeniceControllerConfig { +public class TestVeniceControllerClusterConfig { private static final String DELIMITER = ",\\s*"; private static final String WHITE_LIST = "dc1,dc2"; @@ -22,7 +22,7 @@ public void canParseClusterMap() { builder.put("child.cluster.url.dc1", "http://host:1234, http://host:5678") .put("child.cluster.url.dc2", "http://host:1234, http://host:5678"); - Map map = VeniceControllerConfig.parseClusterMap(builder.build(), WHITE_LIST); + Map map = VeniceControllerClusterConfig.parseClusterMap(builder.build(), WHITE_LIST); Assert.assertEquals(map.size(), 2); Assert.assertTrue(map.keySet().contains("dc1")); @@ -38,7 +38,7 @@ public void canParseD2ClusterMap() { PropertyBuilder builder = new PropertyBuilder(); builder.put("child.cluster.d2.zkHost.dc1", "zkAddress1").put("child.cluster.d2.zkHost.dc2", "zkAddress2"); - Map map = VeniceControllerConfig.parseClusterMap(builder.build(), WHITE_LIST, true); + Map map = VeniceControllerClusterConfig.parseClusterMap(builder.build(), WHITE_LIST, true); Assert.assertEquals(map.get("dc1").split(DELIMITER).length, 1); Assert.assertTrue(map.get("dc2").split(DELIMITER)[0].equals("zkAddress2")); } @@ -51,7 +51,7 @@ public void canParseBannedPaths() { // Add the list of disabled endpoints, '/' are optional, and will be ignored. Invalid values will be filtered builder.put(CONTROLLER_DISABLED_ROUTES, "request_topic, /discover_cluster, foo,bar"); - List parsedRoutes = VeniceControllerConfig + List parsedRoutes = VeniceControllerClusterConfig .parseControllerRoutes(builder.build(), CONTROLLER_DISABLED_ROUTES, Collections.emptyList()); // Make sure it looks right. @@ -64,27 +64,27 @@ public void canParseBannedPaths() { public void emptyAllowlist() { PropertyBuilder build = new PropertyBuilder().put("child.cluster.url.dc1", "http://host:1234, http://host:5678") .put("child.cluster.url.dc2", "http://host:1234, http://host:5678"); - VeniceControllerConfig.parseClusterMap(build.build(), ""); + VeniceControllerClusterConfig.parseClusterMap(build.build(), ""); } @Test(expectedExceptions = VeniceException.class) public void nullAllowlist() { PropertyBuilder build = new PropertyBuilder().put("child.cluster.url.dc1", "http://host:1234, http://host:5678") .put("child.cluster.url.dc2", "http://host:1234, http://host:5678"); - VeniceControllerConfig.parseClusterMap(build.build(), ""); + VeniceControllerClusterConfig.parseClusterMap(build.build(), ""); } @Test(expectedExceptions = VeniceException.class) public void errOnMissingScheme() { PropertyBuilder builder = new PropertyBuilder(); builder.put("child.cluster.url.dc1", "host:1234"); - VeniceControllerConfig.parseClusterMap(builder.build(), WHITE_LIST); + VeniceControllerClusterConfig.parseClusterMap(builder.build(), WHITE_LIST); } @Test(expectedExceptions = VeniceException.class) public void errOnMissingNodes() { PropertyBuilder builder = new PropertyBuilder(); builder.put("child.cluster.url.dc1", ""); - VeniceControllerConfig.parseClusterMap(builder.build(), WHITE_LIST); + VeniceControllerClusterConfig.parseClusterMap(builder.build(), WHITE_LIST); } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java index c052635862..6f88501967 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java @@ -248,9 +248,9 @@ private void testCheckResourceCleanupBeforeStoreCreationWithParams( public void testSourceRegionSelectionForTargetedRegionPush() { // cluster config setup VeniceControllerMultiClusterConfig multiClusterConfigs = mock(VeniceControllerMultiClusterConfig.class); - VeniceControllerConfig config = mock(VeniceControllerConfig.class); - doReturn(config).when(multiClusterConfigs).getControllerConfig("test_cluster"); - doReturn("dc-4").when(config).getNativeReplicationSourceFabric(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(multiClusterConfigs).getControllerConfig("test_cluster"); + doReturn("dc-4").when(clusterConfig).getNativeReplicationSourceFabric(); // store setup Store store = mock(Store.class); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index 5fc9737708..13ee1c819e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -163,8 +163,10 @@ public void testStartWhenTopicNotExists() { private static class AsyncSetupMockVeniceParentHelixAdmin extends VeniceParentHelixAdmin { private Map systemStores = new VeniceConcurrentHashMap<>(); - public AsyncSetupMockVeniceParentHelixAdmin(VeniceHelixAdmin veniceHelixAdmin, VeniceControllerConfig config) { - super(veniceHelixAdmin, TestUtils.getMultiClusterConfigFromOneCluster(config)); + public AsyncSetupMockVeniceParentHelixAdmin( + VeniceHelixAdmin veniceHelixAdmin, + VeniceControllerClusterConfig clusterConfig) { + super(veniceHelixAdmin, TestUtils.getMultiClusterConfigFromOneCluster(clusterConfig)); } public boolean isAsyncSetupRunning(String clusterName) { @@ -299,11 +301,11 @@ public void testAddStore() { @Test public void testCreateStoreForMultiCluster() { String secondCluster = "testCreateStoreForMultiCluster"; - VeniceControllerConfig configForSecondCluster = mockConfig(secondCluster); - mockResources(configForSecondCluster, secondCluster); - Map configMap = new HashMap<>(); - configMap.put(clusterName, config); - configMap.put(secondCluster, configForSecondCluster); + VeniceControllerClusterConfig clusterConfigForSecondCluster = mockConfig(secondCluster); + mockResources(clusterConfigForSecondCluster, secondCluster); + Map configMap = new HashMap<>(); + configMap.put(clusterName, clusterConfig); + configMap.put(secondCluster, clusterConfigForSecondCluster); parentAdmin = new VeniceParentHelixAdmin(internalAdmin, new VeniceControllerMultiClusterConfig(configMap)); Map writerMap = new HashMap<>(); for (String cluster: configMap.keySet()) { @@ -750,7 +752,7 @@ public void testIdempotentIncrementVersionWhenNoPreviousTopics() { null, -1); try (PartialMockVeniceParentHelixAdmin partialMockParentAdmin = - new PartialMockVeniceParentHelixAdmin(internalAdmin, config)) { + new PartialMockVeniceParentHelixAdmin(internalAdmin, clusterConfig)) { VeniceWriter veniceWriter = mock(VeniceWriter.class); partialMockParentAdmin.setVeniceWriterForCluster(clusterName, veniceWriter); @@ -796,8 +798,10 @@ private static class PartialMockVeniceParentHelixAdmin extends VeniceParentHelix */ private Map storeVersionToKillJobStatus = new HashMap<>(); - public PartialMockVeniceParentHelixAdmin(VeniceHelixAdmin veniceHelixAdmin, VeniceControllerConfig config) { - super(veniceHelixAdmin, TestUtils.getMultiClusterConfigFromOneCluster(config)); + public PartialMockVeniceParentHelixAdmin( + VeniceHelixAdmin veniceHelixAdmin, + VeniceControllerClusterConfig clusterConfig) { + super(veniceHelixAdmin, TestUtils.getMultiClusterConfigFromOneCluster(clusterConfig)); } public void setOfflineJobStatus(ExecutionStatus executionStatus) { @@ -846,7 +850,7 @@ public void testIncrementVersionWhenPreviousTopicsExistAndOfflineJobIsStillRunni .waitVersion(eq(clusterName), eq(storeName), eq(1), any()); try (PartialMockVeniceParentHelixAdmin partialMockParentAdmin = - new PartialMockVeniceParentHelixAdmin(internalAdmin, config)) { + new PartialMockVeniceParentHelixAdmin(internalAdmin, clusterConfig)) { partialMockParentAdmin.setOfflineJobStatus(ExecutionStatus.PROGRESS); assertThrows( @@ -881,7 +885,7 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsExistAndOfflineJobIs doReturn(new Pair<>(store, version)).when(internalAdmin) .waitVersion(eq(clusterName), eq(storeName), eq(version.getNumber()), any()); try (PartialMockVeniceParentHelixAdmin partialMockParentAdmin = - new PartialMockVeniceParentHelixAdmin(internalAdmin, config)) { + new PartialMockVeniceParentHelixAdmin(internalAdmin, clusterConfig)) { partialMockParentAdmin.setOfflineJobStatus(ExecutionStatus.NEW); VeniceWriter veniceWriter = mock(VeniceWriter.class); partialMockParentAdmin.setVeniceWriterForCluster(clusterName, veniceWriter); @@ -971,7 +975,7 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsExistButTruncated() null, -1); try (PartialMockVeniceParentHelixAdmin partialMockParentAdmin = - new PartialMockVeniceParentHelixAdmin(internalAdmin, config)) { + new PartialMockVeniceParentHelixAdmin(internalAdmin, clusterConfig)) { partialMockParentAdmin.setOfflineJobStatus(ExecutionStatus.NEW); VeniceWriter veniceWriter = mock(VeniceWriter.class); partialMockParentAdmin.setVeniceWriterForCluster(clusterName, veniceWriter); @@ -1043,7 +1047,7 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsExistAndOfflineJobIs doReturn(new Pair<>(store, version)).when(internalAdmin) .waitVersion(eq(clusterName), eq(storeName), eq(version.getNumber()), any()); try (PartialMockVeniceParentHelixAdmin partialMockParentAdmin = - new PartialMockVeniceParentHelixAdmin(internalAdmin, config)) { + new PartialMockVeniceParentHelixAdmin(internalAdmin, clusterConfig)) { partialMockParentAdmin.setOfflineJobStatus(ExecutionStatus.NEW); try { partialMockParentAdmin.incrementVersionIdempotent(clusterName, storeName, pushJobId, 1, 1); @@ -1092,7 +1096,7 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsDoNotExistButVersion null, -1); try (PartialMockVeniceParentHelixAdmin partialMockParentAdmin = - spy(new PartialMockVeniceParentHelixAdmin(internalAdmin, config))) { + spy(new PartialMockVeniceParentHelixAdmin(internalAdmin, clusterConfig))) { Version newVersion = partialMockParentAdmin.incrementVersionIdempotent( clusterName, storeName, @@ -1145,8 +1149,8 @@ public void testIdempotentIncrementVersionWhenPreviousPushIsARepushAndIncomingPu store.addVersion(version); doReturn(store).when(mockParentAdmin).getStore(clusterName, storeName); - Map configMap = new HashMap<>(); - configMap.put(clusterName, config); + Map configMap = new HashMap<>(); + configMap.put(clusterName, clusterConfig); doReturn( (LingeringStoreVersionChecker) ( @@ -1248,8 +1252,8 @@ public void testIdempotentIncrementVersionWhenPreviousPushIsARepushAndIncomingPu store.addVersion(version); doReturn(store).when(mockParentAdmin).getStore(clusterName, storeName); - Map configMap = new HashMap<>(); - configMap.put(clusterName, config); + Map configMap = new HashMap<>(); + configMap.put(clusterName, clusterConfig); doReturn( (LingeringStoreVersionChecker) ( @@ -1355,8 +1359,8 @@ public void testIdempotentIncrementVersionWhenPreviousPushIsARepushAndIncomingPu store.addVersion(version); doReturn(store).when(mockParentAdmin).getStore(clusterName, storeName); - Map configMap = new HashMap<>(); - configMap.put(clusterName, config); + Map configMap = new HashMap<>(); + configMap.put(clusterName, clusterConfig); doReturn( (LingeringStoreVersionChecker) ( @@ -2401,7 +2405,7 @@ public void testAdminCanCleanupLeakingTopics() { @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) public void testAdminCanKillLingeringVersion(boolean isIncrementalPush) { try (PartialMockVeniceParentHelixAdmin partialMockParentAdmin = - new PartialMockVeniceParentHelixAdmin(internalAdmin, config)) { + new PartialMockVeniceParentHelixAdmin(internalAdmin, clusterConfig)) { long startTime = System.currentTimeMillis(); TestMockTime mockTime = new TestMockTime(startTime); partialMockParentAdmin.setTimer(mockTime); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelperTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelperTest.java index a38e02d3bd..ef177e10d1 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelperTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/init/SystemStoreInitializationHelperTest.java @@ -14,7 +14,7 @@ import com.linkedin.venice.VeniceConstants; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.controller.Admin; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; @@ -94,9 +94,9 @@ public void testInitialSystemStoreSetup(boolean explicitlyProvidedKeySchema) { .when(admin) .getStore(clusterName, systemStoreName); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(multiClusterConfigs).getControllerConfig(clusterName); - doReturn(partitionCount).when(controllerConfig).getMinNumberOfPartitions(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(multiClusterConfigs).getControllerConfig(clusterName); + doReturn(partitionCount).when(clusterConfig).getMinNumberOfPartitions(); SystemStoreInitializationHelper.setupSystemStore( clusterName, @@ -180,8 +180,8 @@ public void testSystemStoreEvolveValueSchema(boolean explicitlyProvidedKeySchema .when(admin) .getValueSchemas(clusterName, systemStoreName); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(multiClusterConfigs).getControllerConfig(clusterName); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(multiClusterConfigs).getControllerConfig(clusterName); SystemStoreInitializationHelper.setupSystemStore( clusterName, diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java index 6327c27ac2..cf8e7f9133 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java @@ -15,7 +15,7 @@ import static org.testng.Assert.assertTrue; import com.linkedin.venice.controller.Admin; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.exceptions.VeniceException; @@ -78,9 +78,9 @@ public void setUp() { .getSourceOfTruthAdminAdapterFactory(); doReturn(1).when(admin).getMinNumberOfUnusedKafkaTopicsToPreserve(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(veniceControllerMultiClusterConfig).getCommonConfig(); - doReturn("local,remote").when(controllerConfig).getChildDatacenters(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(veniceControllerMultiClusterConfig).getCommonConfig(); + doReturn("local,remote").when(clusterConfig).getChildDatacenters(); Map dataCenterToBootstrapServerMap = new HashMap<>(); dataCenterToBootstrapServerMap.put("local", "local"); dataCenterToBootstrapServerMap.put("remote", "remote"); @@ -443,9 +443,9 @@ public void testExtractVersionTopicsToCleanupIgnoresInputWithNonVersionTopics() public void testCleanVeniceTopicsBlockRTTopicDeletionWhenMisconfigured() { // RT topic deletion should be blocked when controller is misconfigured // Mis-configured where local data center is not in the child data centers list - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(veniceControllerMultiClusterConfig).getCommonConfig(); - doReturn("remote").when(controllerConfig).getChildDatacenters(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(veniceControllerMultiClusterConfig).getCommonConfig(); + doReturn("remote").when(clusterConfig).getChildDatacenters(); TopicCleanupService blockedTopicCleanupService = new TopicCleanupService( admin, veniceControllerMultiClusterConfig, diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java index 4f82197562..1994ba8112 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java @@ -6,7 +6,7 @@ import static org.mockito.Mockito.verify; import com.linkedin.venice.controller.Admin; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.pubsub.PubSubClientsFactory; @@ -37,10 +37,10 @@ public void setUp() { VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class); doReturn(1000l).when(config).getTopicCleanupSleepIntervalBetweenTopicListFetchMs(); doReturn(2).when(config).getTopicCleanupDelayFactor(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(config).getCommonConfig(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(config).getCommonConfig(); doReturn(new ApacheKafkaAdminAdapterFactory()).when(config).getSourceOfTruthAdminAdapterFactory(); - doReturn("fabric1,fabric2").when(controllerConfig).getChildDatacenters(); + doReturn("fabric1,fabric2").when(clusterConfig).getChildDatacenters(); String kafkaClusterKey1 = "fabric1"; String kafkaClusterKey2 = "fabric2"; diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java index 81c2aed62d..3095530f9d 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java @@ -6,7 +6,7 @@ import static org.mockito.Mockito.verify; import com.linkedin.venice.controller.Admin; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.pubsub.PubSubClientsFactory; @@ -36,9 +36,9 @@ public void setUp() { VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class); doReturn(1000l).when(config).getTopicCleanupSleepIntervalBetweenTopicListFetchMs(); doReturn(2).when(config).getTopicCleanupDelayFactor(); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - doReturn(controllerConfig).when(config).getCommonConfig(); - doReturn("dc1").when(controllerConfig).getChildDatacenters(); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + doReturn(clusterConfig).when(config).getCommonConfig(); + doReturn("dc1").when(clusterConfig).getChildDatacenters(); TopicCleanupServiceStats topicCleanupServiceStats = mock(TopicCleanupServiceStats.class); doReturn(new ApacheKafkaAdminAdapterFactory()).when(pubSubClientsFactory).getAdminAdapterFactory(); doReturn(new ApacheKafkaAdminAdapterFactory()).when(config).getSourceOfTruthAdminAdapterFactory(); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminConsumerService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminConsumerService.java index 9629005f65..93797d939e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminConsumerService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminConsumerService.java @@ -14,7 +14,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; @@ -59,7 +59,7 @@ public void testMultipleAdminConsumerServiceWithSameMetricsRepo() { .put(CHILD_CLUSTER_ALLOWLIST, someClusterName) .put(SslUtils.getVeniceLocalSslProperties()) .build(); - VeniceControllerConfig controllerConfig = new VeniceControllerConfig(props); + VeniceControllerClusterConfig clusterConfig = new VeniceControllerClusterConfig(props); PubSubConsumerAdapterFactory consumerFactory = mock(PubSubConsumerAdapterFactory.class); @@ -81,7 +81,7 @@ public void testMultipleAdminConsumerServiceWithSameMetricsRepo() { try { adminConsumerService1 = new AdminConsumerService( admin, - controllerConfig, + clusterConfig, metricsRepository, pubSubTopicRepository, pubSubMessageDeserializer); @@ -93,7 +93,7 @@ public void testMultipleAdminConsumerServiceWithSameMetricsRepo() { try { adminConsumerService2 = new AdminConsumerService( admin, - controllerConfig, + clusterConfig, metricsRepository, pubSubTopicRepository, pubSubMessageDeserializer); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/utils/ParentControllerConfigUpdateUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/utils/ParentControllerConfigUpdateUtilsTest.java index c4883e34ec..c7662404e5 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/utils/ParentControllerConfigUpdateUtilsTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/utils/ParentControllerConfigUpdateUtilsTest.java @@ -5,7 +5,7 @@ import static org.mockito.Mockito.when; import com.linkedin.venice.controller.HelixVeniceClusterResources; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controller.VeniceParentHelixAdmin; import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore; @@ -30,8 +30,8 @@ public void testPartialUpdateConfigUpdate() { when(parentHelixAdmin.getVeniceHelixAdmin()).thenReturn(veniceHelixAdmin); when(veniceHelixAdmin.getStore(anyString(), anyString())).thenReturn(store); HelixVeniceClusterResources helixVeniceClusterResources = mock(HelixVeniceClusterResources.class); - VeniceControllerConfig controllerConfig = mock(VeniceControllerConfig.class); - when(helixVeniceClusterResources.getConfig()).thenReturn(controllerConfig); + VeniceControllerClusterConfig clusterConfig = mock(VeniceControllerClusterConfig.class); + when(helixVeniceClusterResources.getConfig()).thenReturn(clusterConfig); when(veniceHelixAdmin.getHelixVeniceClusterResources(anyString())).thenReturn(helixVeniceClusterResources); SchemaEntry schemaEntry = new SchemaEntry(1, TestWriteUtils.USER_WITH_DEFAULT_SCHEMA); when(veniceHelixAdmin.getValueSchemas(anyString(), anyString())).thenReturn(Collections.singletonList(schemaEntry)); @@ -89,8 +89,8 @@ public void testPartialUpdateConfigUpdate() { * No request. */ partialUpdateRequest = Optional.empty(); - when(controllerConfig.isEnablePartialUpdateForHybridActiveActiveUserStores()).thenReturn(false); - when(controllerConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores()).thenReturn(false); + when(clusterConfig.isEnablePartialUpdateForHybridActiveActiveUserStores()).thenReturn(false); + when(clusterConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores()).thenReturn(false); // Case 1: partial update config not updated. setStore = new UpdateStore(); Assert.assertFalse( @@ -111,8 +111,8 @@ public void testPartialUpdateConfigUpdate() { setStore, true)); // Case 2: partial update config updated. - when(controllerConfig.isEnablePartialUpdateForHybridActiveActiveUserStores()).thenReturn(true); - when(controllerConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores()).thenReturn(true); + when(clusterConfig.isEnablePartialUpdateForHybridActiveActiveUserStores()).thenReturn(true); + when(clusterConfig.isEnablePartialUpdateForHybridNonActiveActiveUserStores()).thenReturn(true); setStore = new UpdateStore(); Assert.assertTrue( ParentControllerConfigUpdateUtils.checkAndMaybeApplyPartialUpdateConfig( diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java index fefa850a5f..bed67ce8d4 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java @@ -25,7 +25,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixState; @@ -78,7 +78,7 @@ public abstract class AbstractPushMonitorTest { private AggPushHealthStats mockPushHealthStats; protected ClusterLockManager clusterLockManager; - protected VeniceControllerConfig mockControllerConfig; + protected VeniceControllerClusterConfig mockControllerConfig; protected MetricsRepository mockMetricRepo; @@ -110,7 +110,7 @@ public void setUp() { mockRoutingDataRepo = mock(RoutingDataRepository.class); mockPushHealthStats = mock(AggPushHealthStats.class); clusterLockManager = new ClusterLockManager(clusterName); - mockControllerConfig = mock(VeniceControllerConfig.class); + mockControllerConfig = mock(VeniceControllerClusterConfig.class); when(mockMetricRepo.sensor(anyString(), any())).thenReturn(mock(Sensor.class)); when(mockControllerConfig.isErrorLeaderReplicaFailOverEnabled()).thenReturn(true); when(mockControllerConfig.isDaVinciPushStatusEnabled()).thenReturn(true); @@ -1039,7 +1039,7 @@ protected ReadWriteStoreRepository getMockStoreRepo() { return mockStoreRepo; } - protected VeniceControllerConfig getMockControllerConfig() { + protected VeniceControllerClusterConfig getMockControllerConfig() { return mockControllerConfig; }