Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Create cluster if it doesn't exist during metadata initialisation (#445)
Browse files Browse the repository at this point in the history
This case is usually encountered when attempting to use KoP with standalone mode. When new behavior was introduced in #168 standalone mode was broken due to sample namespace setup etc not taking place until after the broker is initialized (which in turn initializes protocol handlers).

Fixes #185
  • Loading branch information
josephglanville authored Apr 18, 2021
1 parent c4d1892 commit 5ee6bf0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -370,7 +371,11 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
.build();

PulsarAdmin pulsarAdmin = service.pulsar().getAdminClient();
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, kafkaConfig);
ClusterData clusterData = new ClusterData(service.getPulsar().getWebServiceAddress(),
service.getPulsar().getWebServiceAddressTls(),
service.getPulsar().getBrokerServiceUrl(),
service.getPulsar().getBrokerServiceUrlTls());
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);


this.groupCoordinator = GroupCoordinator.of(
Expand Down Expand Up @@ -403,7 +408,12 @@ public void initTransactionCoordinator() throws Exception {
.build();

PulsarAdmin pulsarAdmin = brokerService.getPulsar().getAdminClient();
MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, kafkaConfig);
ClusterData clusterData = new ClusterData(brokerService.getPulsar().getWebServiceAddress(),
brokerService.getPulsar().getWebServiceAddressTls(),
brokerService.getPulsar().getBrokerServiceUrl(),
brokerService.getPulsar().getBrokerServiceUrlTls());

MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);

this.transactionCoordinator = TransactionCoordinator.of(
transactionConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.common.internals.Topic;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -56,16 +55,20 @@ public static String constructTxnLogTopicBaseName(KafkaServiceConfiguration conf
+ "/" + Topic.TRANSACTION_STATE_TOPIC_NAME;
}

public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf)
throws PulsarServerException, PulsarAdminException {
public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin,
ClusterData clusterData,
KafkaServiceConfiguration conf)
throws PulsarAdminException {
KopTopic kopTopic = new KopTopic(constructOffsetsTopicBaseName(conf));
createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getOffsetsTopicNumPartitions());
createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getOffsetsTopicNumPartitions());
}

public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf)
throws PulsarServerException, PulsarAdminException {
public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin,
ClusterData clusterData,
KafkaServiceConfiguration conf)
throws PulsarAdminException {
KopTopic kopTopic = new KopTopic(constructTxnLogTopicBaseName(conf));
createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getTxnLogTopicNumPartitions());
createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getTxnLogTopicNumPartitions());
}

/**
Expand All @@ -83,10 +86,11 @@ public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServ
* </ul>
*/
private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
KafkaServiceConfiguration conf,
KopTopic kopTopic,
int partitionNum)
throws PulsarServerException, PulsarAdminException {
ClusterData clusterData,
KafkaServiceConfiguration conf,
KopTopic kopTopic,
int partitionNum)
throws PulsarAdminException {
String cluster = conf.getClusterName();
String kafkaMetadataTenant = conf.getKafkaMetadataTenant();
String kafkaMetadataNamespace = kafkaMetadataTenant + "/" + conf.getKafkaMetadataNamespace();
Expand All @@ -97,12 +101,21 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
try {
Clusters clusters = pulsarAdmin.clusters();
if (!clusters.getClusters().contains(cluster)) {
throw new PulsarServerException.NotFoundException("Configured cluster does not exist");
try {
pulsarAdmin.clusters().createCluster(cluster, clusterData);
} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
log.info("Attempted to create cluster {} however it was created concurrently.", cluster);
} else {
// Re-throw all other exceptions
throw e;
}
}
} else {
ClusterData configuredClusterData = clusters.getCluster(cluster);
log.info("Cluster {} found: {}", cluster, configuredClusterData);
clusterExists = true;
}
clusterExists = true;

// Check if the metadata tenant exists and create it if not
Tenants tenants = pulsarAdmin.tenants();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.Test;
Expand All @@ -50,6 +51,7 @@ public class MetadataUtilsTest {
public void testCreateKafkaMetadataIfMissing() throws Exception {
KopTopic.initialize("public/default");
KafkaServiceConfiguration conf = new KafkaServiceConfiguration();
ClusterData clusterData = new ClusterData();
conf.setClusterName("test");
conf.setKafkaMetadataTenant("public");
conf.setKafkaMetadataNamespace("default");
Expand Down Expand Up @@ -86,7 +88,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
TenantInfo partialTenant = new TenantInfo();
doReturn(partialTenant).when(mockTenants).getTenantInfo(eq(conf.getKafkaMetadataTenant()));

MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

// After call the createOffsetMetadataIfMissing, these methods should return expected data.
doReturn(Lists.newArrayList(conf.getKafkaMetadataTenant())).when(mockTenants).getTenants();
Expand All @@ -95,7 +97,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
doReturn(Lists.newArrayList(conf.getClusterName())).when(mockNamespaces)
.getNamespaceReplicationClusters(eq(namespace));

MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

verify(mockTenants, times(1)).createTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class));
verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaMetadataTenant() + "/"
Expand Down Expand Up @@ -141,8 +143,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
doReturn(incompletePartitionList).when(mockTopics).getList(eq(conf.getKafkaMetadataTenant()
+ "/" + conf.getKafkaMetadataNamespace()));

MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

verify(mockTenants, times(1)).updateTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class));
verify(mockNamespaces, times(2)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
Expand Down Expand Up @@ -231,6 +232,8 @@ protected final void init() throws Exception {
String brokerServiceUrl = "pulsar://" + this.conf.getAdvertisedAddress() + ":" + brokerPort;
String brokerServiceUrlTls = null; // TLS not supported at this time

ClusterData clusterData = new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, null);

mockZooKeeper = createMockZooKeeper(configClusterName, serviceUrl, serviceUrlTls, brokerServiceUrl,
brokerServiceUrlTls);
mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
Expand All @@ -239,8 +242,8 @@ protected final void init() throws Exception {

createAdmin();

MetadataUtils.createOffsetMetadataIfMissing(admin, this.conf);
MetadataUtils.createTxnMetadataIfMissing(admin, this.conf);
MetadataUtils.createOffsetMetadataIfMissing(admin, clusterData, this.conf);
MetadataUtils.createTxnMetadataIfMissing(admin, clusterData, this.conf);

if (enableSchemaRegistry) {
admin.topics().createPartitionedTopic(KAFKASTORE_TOPIC, 1);
Expand Down

0 comments on commit 5ee6bf0

Please sign in to comment.