Skip to content

Commit

Permalink
[fix] [broker] Fix configurationMetadataSyncEventTopic is marked supp…
Browse files Browse the repository at this point in the history
…orting dynamic setting, but not implemented (apache#22684)

(cherry picked from commit ff4853e)
  • Loading branch information
poorbarcode committed May 10, 2024
1 parent 9b13730 commit 03da743
Show file tree
Hide file tree
Showing 11 changed files with 633 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,12 @@ public CompletableFuture<Void> closeAsync() {
}
}

closeLocalMetadataStore();
asyncCloseFutures.add(closeLocalMetadataStore());
if (configMetadataSynchronizer != null) {
asyncCloseFutures.add(configMetadataSynchronizer.closeAsync());
}
if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore) {
configurationMetadataStore.close();
if (configMetadataSynchronizer != null) {
configMetadataSynchronizer.close();
configMetadataSynchronizer = null;
}
}

if (transactionExecutorProvider != null) {
Expand Down Expand Up @@ -1114,14 +1113,16 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
.build());
}

protected void closeLocalMetadataStore() throws Exception {
protected CompletableFuture<Void> closeLocalMetadataStore() throws Exception {
if (localMetadataStore != null) {
localMetadataStore.close();
}
if (localMetadataSynchronizer != null) {
localMetadataSynchronizer.close();
CompletableFuture<Void> closeSynchronizer = localMetadataSynchronizer.closeAsync();
localMetadataSynchronizer = null;
return closeSynchronizer;
}
return CompletableFuture.completedFuture(null);
}

protected void startLeaderElectionService() {
Expand Down Expand Up @@ -1935,4 +1936,69 @@ public void shutdownNow() {
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}

public void initConfigMetadataSynchronizerIfNeeded() {
mutex.lock();
try {
final String newTopic = config.getConfigurationMetadataSyncEventTopic();
final PulsarMetadataEventSynchronizer oldSynchronizer = configMetadataSynchronizer;
// Skip if not support.
if (!(configurationMetadataStore instanceof MetadataStoreExtended)) {
LOG.info(
"Skip to update Metadata Synchronizer because of the Configuration Metadata Store using[{}]"
+ " does not support.", configurationMetadataStore.getClass().getName());
return;
}
// Skip if no changes.
// case-1: both null.
// case-2: both topics are the same.
if ((oldSynchronizer == null && StringUtils.isBlank(newTopic))) {
LOG.info("Skip to update Metadata Synchronizer because the topic[null] does not changed.");
}
if (StringUtils.isNotBlank(newTopic) && oldSynchronizer != null) {
TopicName newTopicName = TopicName.get(newTopic);
TopicName oldTopicName = TopicName.get(oldSynchronizer.getTopicName());
if (newTopicName.equals(oldTopicName)) {
LOG.info("Skip to update Metadata Synchronizer because the topic[{}] does not changed.",
oldTopicName);
}
}
// Update(null or not null).
// 1.set the new one.
// 2.close the old one.
// 3.async start the new one.
if (StringUtils.isBlank(newTopic)) {
configMetadataSynchronizer = null;
} else {
configMetadataSynchronizer = new PulsarMetadataEventSynchronizer(this, newTopic);
}
// close the old one and start the new one.
PulsarMetadataEventSynchronizer newSynchronizer = configMetadataSynchronizer;
MetadataStoreExtended metadataStoreExtended = (MetadataStoreExtended) configurationMetadataStore;
metadataStoreExtended.updateMetadataEventSynchronizer(newSynchronizer);
Runnable startNewSynchronizer = () -> {
if (newSynchronizer == null) {
return;
}
try {
newSynchronizer.start();
} catch (Exception e) {
// It only occurs when get internal client fails.
LOG.error("Start Metadata Synchronizer with topic {} failed.",
newTopic, e);
}
};
executor.submit(() -> {
if (oldSynchronizer != null) {
oldSynchronizer.closeAsync().whenComplete((ignore, ex) -> {
startNewSynchronizer.run();
});
} else {
startNewSynchronizer.run();
}
});
} finally {
mutex.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2827,6 +2827,11 @@ private void updateConfigurationAndRegisterListeners() {
pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled((boolean) enabled);
});

// add listener to notify web service httpRequestsFailOnUnknownPropertiesEnabled changed.
registerConfigurationListener("configurationMetadataSyncEventTopic", enabled -> {
pulsar.initConfigMetadataSynchronizerIfNeeded();
});

// add more listeners here

// (3) create dynamic-config if not exist.
Expand Down
Loading

0 comments on commit 03da743

Please sign in to comment.