Skip to content

Commit

Permalink
[fix][broker] fix ModularLoadManagerImpl always delete active bundle-…
Browse files Browse the repository at this point in the history
…data. sec ver. (apache#20620)

Co-authored-by: wangjinlong <[email protected]>
(cherry picked from commit f2f0bf4)
  • Loading branch information
lifepuzzlefun authored and nodece committed Jul 24, 2024
1 parent dd92407 commit b81dd71
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,17 +565,6 @@ private void updateBundleData() {
bundleData.put(bundle, currentBundleData);
}
}

//Remove not active bundle from loadData
for (String bundle : bundleData.keySet()) {
if (!activeBundles.contains(bundle)){
bundleData.remove(bundle);
if (pulsar.getLeaderElectionService().isLeader()){
deleteBundleDataFromMetadataStore(bundle);
}
}
}

// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
Set<String> ownedNsBundles = pulsar.getNamespaceService().getOwnedServiceUnits()
Expand Down Expand Up @@ -610,6 +599,16 @@ private void updateBundleData() {
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), namespaceToBundleRange);
}
}

// Remove not active bundle from loadData
for (String bundle : bundleData.keySet()) {
if (!activeBundles.contains(bundle)){
bundleData.remove(bundle);
if (pulsar.getLeaderElectionService().isLeader()){
deleteBundleDataFromMetadataStore(bundle);
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -64,17 +66,22 @@
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
Expand Down Expand Up @@ -716,4 +723,116 @@ public void testRemoveDeadBrokerTimeAverageData() throws Exception {


}


@Test
public void testRemoveNonExistBundleData()
throws PulsarAdminException, InterruptedException,
PulsarClientException, PulsarServerException, NoSuchFieldException, IllegalAccessException {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "remove-non-exist-bundle-data-ns";
final String topicName = tenant + "/" + namespace + "/" + "topic";
int bundleNumbers = 8;

admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();

// create a lot of topic to fully distributed among bundles.
for (int i = 0; i < 10; i++) {
String topicNameI = topicName + i;
admin1.topics().createPartitionedTopic(topicNameI, 20);
// trigger bundle assignment

pulsarClient.newConsumer().topic(topicNameI)
.subscriptionName("my-subscriber-name2").subscribe();
}

ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
ModularLoadManagerImpl lm1 = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
ModularLoadManagerWrapper loadManager2 = (ModularLoadManagerWrapper) pulsar2.getLoadManager().get();
ModularLoadManagerImpl lm2 = (ModularLoadManagerImpl) loadManager2.getLoadManager();

Field executors = lm1.getClass().getDeclaredField("executors");
executors.setAccessible(true);
ExecutorService executorService = (ExecutorService) executors.get(lm1);

assertEquals(lm1.getAvailableBrokers().size(), 2);

pulsar1.getBrokerService().updateRates();
pulsar2.getBrokerService().updateRates();

lm1.writeBrokerDataOnZooKeeper(true);
lm2.writeBrokerDataOnZooKeeper(true);

// wait for metadata store notification finish
CountDownLatch latch = new CountDownLatch(1);
executorService.submit(latch::countDown);
latch.await();

loadManagerWrapper.writeResourceQuotasToZooKeeper();

MetadataCache<BundleData> bundlesCache = pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class);

// trigger bundle split
String topicToFindBundle = topicName + 0;
NamespaceBundle bundleWillBeSplit = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));

String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/" + tenant + "/" + namespace;
CompletableFuture<List<String>> children = bundlesCache.getChildren(bundleDataPath);
List<String> bundles = children.join();
assertTrue(bundles.contains(bundleWillBeSplit.getBundleRange()));

// after updateAll no namespace bundle data is deleted from metadata store.
lm1.updateAll();

children = bundlesCache.getChildren(bundleDataPath);
bundles = children.join();
assertFalse(bundles.isEmpty());
assertEquals(bundleNumbers, bundles.size());

NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
pulsar1.getAdminClient().namespaces().splitNamespaceBundle(tenant + "/" + namespace,
bundleWillBeSplit.getBundleRange(),
false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);

NamespaceBundles allBundlesAfterSplit =
pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);

assertFalse(allBundlesAfterSplit.getBundles().contains(bundleWillBeSplit));

// the bundle data should be deleted

pulsar1.getBrokerService().updateRates();
pulsar2.getBrokerService().updateRates();

lm1.writeBrokerDataOnZooKeeper(true);
lm2.writeBrokerDataOnZooKeeper(true);

latch = new CountDownLatch(1);
// wait for metadata store notification finish
CountDownLatch finalLatch = latch;
executorService.submit(finalLatch::countDown);
latch.await();

loadManagerWrapper.writeResourceQuotasToZooKeeper();

lm1.updateAll();

log.info("update all triggered.");

// check bundle data should be deleted from metadata store.

CompletableFuture<List<String>> childrenAfterSplit = bundlesCache.getChildren(bundleDataPath);
List<String> bundlesAfterSplit = childrenAfterSplit.join();

assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}

}

0 comments on commit b81dd71

Please sign in to comment.