Skip to content

Commit

Permalink
[fix] [broker] Fix configurationMetadataSyncEventTopic is marked supp…
Browse files Browse the repository at this point in the history
…orting dynamic setting, but not implemented (apache#22684)
  • Loading branch information
poorbarcode authored May 10, 2024
1 parent 253e650 commit ff4853e
Show file tree
Hide file tree
Showing 12 changed files with 641 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,12 @@ public CompletableFuture<Void> closeAsync() {
}
}

closeLocalMetadataStore();
asyncCloseFutures.add(closeLocalMetadataStore());
if (configMetadataSynchronizer != null) {
asyncCloseFutures.add(configMetadataSynchronizer.closeAsync());
}
if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore) {
configurationMetadataStore.close();
if (configMetadataSynchronizer != null) {
configMetadataSynchronizer.close();
configMetadataSynchronizer = null;
}
}

if (transactionExecutorProvider != null) {
Expand Down Expand Up @@ -1160,14 +1159,16 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
.build());
}

protected void closeLocalMetadataStore() throws Exception {
protected CompletableFuture<Void> closeLocalMetadataStore() throws Exception {
if (localMetadataStore != null) {
localMetadataStore.close();
}
if (localMetadataSynchronizer != null) {
localMetadataSynchronizer.close();
CompletableFuture<Void> closeSynchronizer = localMetadataSynchronizer.closeAsync();
localMetadataSynchronizer = null;
return closeSynchronizer;
}
return CompletableFuture.completedFuture(null);
}

protected void startLeaderElectionService() {
Expand Down Expand Up @@ -1928,4 +1929,69 @@ public CompletableFuture<TopicCompactionService> newTopicCompactionService(Strin
return CompletableFuture.failedFuture(e);
}
}

public void initConfigMetadataSynchronizerIfNeeded() {
mutex.lock();
try {
final String newTopic = config.getConfigurationMetadataSyncEventTopic();
final PulsarMetadataEventSynchronizer oldSynchronizer = configMetadataSynchronizer;
// Skip if not support.
if (!(configurationMetadataStore instanceof MetadataStoreExtended)) {
LOG.info(
"Skip to update Metadata Synchronizer because of the Configuration Metadata Store using[{}]"
+ " does not support.", configurationMetadataStore.getClass().getName());
return;
}
// Skip if no changes.
// case-1: both null.
// case-2: both topics are the same.
if ((oldSynchronizer == null && StringUtils.isBlank(newTopic))) {
LOG.info("Skip to update Metadata Synchronizer because the topic[null] does not changed.");
}
if (StringUtils.isNotBlank(newTopic) && oldSynchronizer != null) {
TopicName newTopicName = TopicName.get(newTopic);
TopicName oldTopicName = TopicName.get(oldSynchronizer.getTopicName());
if (newTopicName.equals(oldTopicName)) {
LOG.info("Skip to update Metadata Synchronizer because the topic[{}] does not changed.",
oldTopicName);
}
}
// Update(null or not null).
// 1.set the new one.
// 2.close the old one.
// 3.async start the new one.
if (StringUtils.isBlank(newTopic)) {
configMetadataSynchronizer = null;
} else {
configMetadataSynchronizer = new PulsarMetadataEventSynchronizer(this, newTopic);
}
// close the old one and start the new one.
PulsarMetadataEventSynchronizer newSynchronizer = configMetadataSynchronizer;
MetadataStoreExtended metadataStoreExtended = (MetadataStoreExtended) configurationMetadataStore;
metadataStoreExtended.updateMetadataEventSynchronizer(newSynchronizer);
Runnable startNewSynchronizer = () -> {
if (newSynchronizer == null) {
return;
}
try {
newSynchronizer.start();
} catch (Exception e) {
// It only occurs when get internal client fails.
LOG.error("Start Metadata Synchronizer with topic {} failed.",
newTopic, e);
}
};
executor.submit(() -> {
if (oldSynchronizer != null) {
oldSynchronizer.closeAsync().whenComplete((ignore, ex) -> {
startNewSynchronizer.run();
});
} else {
startNewSynchronizer.run();
}
});
} finally {
mutex.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2804,6 +2804,11 @@ private void updateConfigurationAndRegisterListeners() {
pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled((boolean) enabled);
});

// add listener to notify web service httpRequestsFailOnUnknownPropertiesEnabled changed.
registerConfigurationListener("configurationMetadataSyncEventTopic", enabled -> {
pulsar.initConfigMetadataSynchronizerIfNeeded();
});

// add more listeners here

// (3) create dynamic-config if not exist.
Expand Down
Loading

0 comments on commit ff4853e

Please sign in to comment.