Skip to content

Commit

Permalink
[pubsub][test] Add support for passing pubsub broker configs to Venic…
Browse files Browse the repository at this point in the history
…e components in the test framework (linkedin#523)

Sometimes to construct pubsub clients we need to pass additional details about the pubsub brokers.    
This PR will let us pass these details to the pubsub clients in integration testing env.
  • Loading branch information
sushantmane authored Jul 1, 2023
1 parent 77c8547 commit 6ef7955
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import static com.linkedin.venice.utils.SslUtils.VeniceTlsConfiguration;

import com.linkedin.venice.pubsub.api.PubSubClientsFactory;
import com.linkedin.venice.utils.TestUtils;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


public abstract class PubSubBrokerWrapper extends ProcessWrapper {
Expand All @@ -29,4 +34,17 @@ public String toString() {
public abstract String getPubSubClusterName();

public abstract String getRegionName();

Map<String, String> getAdditionalConfig() {
return Collections.emptyMap();
}

/**
* Configs that have the same key, will be merged into a single config with individual values separated by commas.
*/
public static Map<String, String> combineAdditionalConfigs(List<PubSubBrokerWrapper> pubSubBrokerWrappers) {
List<Map<String, String>> additionalConfigs =
pubSubBrokerWrappers.stream().map(PubSubBrokerWrapper::getAdditionalConfig).collect(Collectors.toList());
return TestUtils.combineConfigs(additionalConfigs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ static ServiceProvider<VeniceClusterWrapper> generateService(VeniceClusterCreate
"PubSubBrokerWrapper region name " + pubSubBrokerWrapper.getRegionName()
+ " does not match with the region name " + options.getRegionName() + " in the options");
}

pubSubBrokerWrapper.getAdditionalConfig().forEach((k, v) -> options.getExtraProperties().putIfAbsent(k, v));
// Setup D2 for controller
String zkAddress = zkServerWrapper.getAddress();
D2TestUtils.setupD2Config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo
}
}

// Add additional config from PubSubBrokerWrapper to server.properties iff the key is not already present
Map<String, String> brokerDetails = options.getKafkaBroker().getAdditionalConfig();
for (Map.Entry<String, String> entry: brokerDetails.entrySet()) {
builder.putIfAbsent(entry.getKey(), entry.getValue());
}

VeniceProperties props = builder.build();
propertiesList.add(props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARTITION_SIZE_BYTES;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_REPLICATION_FACTOR;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_SSL_TO_STORAGE_NODES;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME;

import com.linkedin.venice.utils.VeniceProperties;
import java.util.Collections;
Expand Down Expand Up @@ -182,7 +183,7 @@ public String toString() {
.toString();
}

public VeniceMultiClusterCreateOptions(Builder builder) {
private VeniceMultiClusterCreateOptions(Builder builder) {
regionName = builder.regionName;
numberOfClusters = builder.numberOfClusters;
numberOfControllers = builder.numberOfControllers;
Expand Down Expand Up @@ -342,7 +343,7 @@ private void addDefaults() {
kafkaClusterMap = Collections.emptyMap();
}
if (regionName == null) {
throw new IllegalArgumentException("regionName cannot be null");
regionName = STANDALONE_REGION_NAME;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static ServiceProvider<VeniceMultiClusterWrapper> generateService(VeniceMultiClu
"PubSubBrokerWrapper region name " + pubSubBrokerWrapper.getRegionName()
+ " does not match with the region name " + options.getRegionName() + " in the options");
}
Map<String, String> pubBrokerDetails = pubSubBrokerWrapper.getAdditionalConfig();
String[] clusterNames = new String[options.getNumberOfClusters()];
Map<String, String> clusterToD2 = new HashMap<>();
Map<String, String> clusterToServerD2 = new HashMap<>();
Expand Down Expand Up @@ -106,6 +107,7 @@ static ServiceProvider<VeniceMultiClusterWrapper> generateService(VeniceMultiClu
ClientConfig.defaultGenericClientConfig("")
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setD2Client(clientConfigD2Client));
pubBrokerDetails.forEach((key, value) -> controllerProperties.putIfAbsent(key, value));
VeniceControllerCreateOptions controllerCreateOptions =
new VeniceControllerCreateOptions.Builder(clusterNames, zkServerWrapper, pubSubBrokerWrapper)
.regionName(options.getRegionName())
Expand All @@ -127,6 +129,7 @@ static ServiceProvider<VeniceMultiClusterWrapper> generateService(VeniceMultiClu
Properties extraProperties = options.getVeniceProperties().toProperties();
extraProperties.put(SYSTEM_SCHEMA_CLUSTER_NAME, clusterNames[0]);
extraProperties.putAll(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
pubBrokerDetails.forEach((key, value) -> extraProperties.putIfAbsent(key, value));
VeniceClusterCreateOptions.Builder vccBuilder =
new VeniceClusterCreateOptions.Builder().regionName(options.getRegionName())
.standalone(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,16 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
}

// Add additional config from PubSubBrokerWrapper to server.properties iff the key is not already present
Map<String, String> brokerDetails = pubSubBrokerWrapper.getAdditionalConfig();
for (Map.Entry<String, String> entry: brokerDetails.entrySet()) {
if (clusterProps.containsKey(entry.getKey())) {
// skip if the key is already present in cluster.properties
continue;
}
serverPropsBuilder.putIfAbsent(entry.getKey(), entry.getValue());
}

VeniceProperties serverProps = serverPropsBuilder.build();

File serverConfigFile = new File(configDirectory, VeniceConfigLoader.SERVER_PROPERTIES_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
Map<String, Map<String, String>> kafkaClusterMap =
addKafkaClusterIDMappingToServerConfigs(serverProperties, childRegionName, allPubSubBrokerWrappers);

Map<String, String> pubSubBrokerProps = PubSubBrokerWrapper.combineAdditionalConfigs(allPubSubBrokerWrappers);
finalParentControllerProperties.putAll(pubSubBrokerProps); // parent controllers
finalChildControllerProperties.putAll(pubSubBrokerProps); // child controllers

Properties additionalServerProps = new Properties();
serverProperties
.ifPresent(veniceProperties -> additionalServerProps.putAll(veniceProperties.getPropertiesCopy()));
additionalServerProps.putAll(pubSubBrokerProps);
serverProperties = Optional.of(new VeniceProperties(additionalServerProps));

VeniceMultiClusterCreateOptions.Builder builder =
new VeniceMultiClusterCreateOptions.Builder(numberOfClustersInEachRegion)
.numberOfControllers(numberOfControllers)
Expand Down Expand Up @@ -252,7 +262,7 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
false,
VeniceControllerWrapper.PARENT_D2_CLUSTER_NAME,
VeniceControllerWrapper.PARENT_D2_SERVICE_NAME);
VeniceControllerCreateOptions options =
VeniceControllerCreateOptions parentControllerCreateOptions =
new VeniceControllerCreateOptions.Builder(clusterNames, zkServer, parentPubSubBrokerWrapper)
.replicationFactor(replicationFactor)
.childControllers(childControllers)
Expand All @@ -264,7 +274,7 @@ static ServiceProvider<VeniceTwoLayerMultiRegionMultiClusterWrapper> generateSer
// Create parentControllers for multi-cluster
for (int i = 0; i < numberOfParentControllers; i++) {
// random controller from each multi-cluster, in reality this should include all controllers, not just one
VeniceControllerWrapper parentController = ServiceFactory.getVeniceController(options);
VeniceControllerWrapper parentController = ServiceFactory.getVeniceController(parentControllerCreateOptions);
parentControllers.add(parentController);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,15 @@ public static int getFreePort() {
throw new RuntimeException(e);
}
}

public static Map<String, String> combineConfigs(List<Map<String, String>> configMaps) {
Map<String, String> aggregateConfigMap = new HashMap<>(2);

for (Map<String, String> configMap: configMaps) {
for (Map.Entry<String, String> entry: configMap.entrySet()) {
aggregateConfigMap.compute(entry.getKey(), (k, v) -> v == null ? entry.getValue() : v + "," + entry.getValue());
}
}
return aggregateConfigMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.meta.StoreInfo;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.testng.annotations.Test;


Expand All @@ -26,4 +32,43 @@ public void testVerifyDCConfigNativeAndActiveRepl() {
verify(storeInfo).isNativeReplicationEnabled();
verify(storeInfo).isActiveActiveReplicationEnabled();
}

@Test
public void testCombineConfigs() {
Map<String, String> combinedConfigs = TestUtils.combineConfigs(
Arrays.asList(
getAdditionalConfigsForRegion(0),
getAdditionalConfigsForRegion(1),
getAdditionalConfigsForRegion(2)));
assertNotNull(combinedConfigs);
assertEquals(combinedConfigs.size(), 2);

Properties properties = new Properties();
properties.putAll(combinedConfigs);
VeniceProperties veniceProperties = new VeniceProperties(properties);

Map<String, String> regionToBrokerUrlMap = veniceProperties.getMap("pubsub.region.to.broker.url");
assertEquals(regionToBrokerUrlMap.size(), 3);
assertEquals(regionToBrokerUrlMap.get("dc0"), "dc0.pubsub.com:9090");
assertEquals(regionToBrokerUrlMap.get("dc1"), "dc1.pubsub.com:9091");
assertEquals(regionToBrokerUrlMap.get("dc2"), "dc2.pubsub.com:9092");

Map<String, String> regionToSomeConfigMap = veniceProperties.getMap("pubsub.region.to.some.config");
assertEquals(regionToSomeConfigMap.size(), 3);

assertEquals(regionToSomeConfigMap.get("dc0"), "dc0.some.config.com:8080");
assertEquals(regionToSomeConfigMap.get("dc1"), "dc1.some.config.com:8081");
assertEquals(regionToSomeConfigMap.get("dc2"), "dc2.some.config.com:8082");
}

private Map<String, String> getAdditionalConfigsForRegion(int regionNumber) {
Map<String, String> additionalConfig = new HashMap<>(2);
additionalConfig.put(
"pubsub.region.to.broker.url",
"dc" + regionNumber + ":dc" + regionNumber + ".pubsub.com:909" + regionNumber);
additionalConfig.put(
"pubsub.region.to.some.config",
"dc" + regionNumber + ":dc" + regionNumber + ".some.config.com:808" + regionNumber);
return additionalConfig;
}
}

0 comments on commit 6ef7955

Please sign in to comment.