Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix ModularLoadManagerImpl always delete active bundle-data. sec ver. #20620

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -558,17 +558,6 @@ private void updateBundleData() {
bundleData.put(bundle, currentBundleData);
}
}

//Remove not active bundle from loadData
for (String bundle : bundleData.keySet()) {
if (!activeBundles.contains(bundle)){
bundleData.remove(bundle);
if (pulsar.getLeaderElectionService().isLeader()){
deleteBundleDataFromMetadataStore(bundle);
}
}
}

// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
Set<String> ownedNsBundles = pulsar.getNamespaceService().getOwnedServiceUnits()
Expand Down Expand Up @@ -603,6 +592,16 @@ private void updateBundleData() {
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), namespaceToBundleRange);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this reordering can resolve the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.
If we need to figure out which bundle is not active, we should get cluster wide all bundle data, right ? To get all cluster wide bundle data, we need iterate all brokerData to collect this.
Current we only iterate the first brokerData and check active bundles. so the all the bundleData which is not owned by the first broker will be cleaned at the first broker check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, maybe I missed some logic here.

But, In this func, I don't see bundleData and activeBundles are updated between these lines. https://github.com/apache/pulsar/pull/20620/files#diff-642d3e26ddad51db8fb736b0f5519f4c929814a4f69d85e16829d19b885be0ebR562-R594 .
So, I am curious to know how this reordering can be effective.

To get all cluster wide bundle data, we need iterate all brokerData to collect this.
Current we only iterate the first brokerData and check active bundles. so the all the bundleData which is not owned by the first broker will be cleaned at the first broker check.

Can you provide the code lines which iterate all brokerData and fill activeBundles and bundleData, other than here?
https://github.com/apache/pulsar/pull/20620/files#diff-642d3e26ddad51db8fb736b0f5519f4c929814a4f69d85e16829d19b885be0ebR549-R558

// Remove not active bundle from loadData
for (String bundle : bundleData.keySet()) {
if (!activeBundles.contains(bundle)){
bundleData.remove(bundle);
if (pulsar.getLeaderElectionService().isLeader()){
deleteBundleDataFromMetadataStore(bundle);
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -62,17 +64,22 @@
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
Expand Down Expand Up @@ -749,4 +756,116 @@ public void testRemoveDeadBrokerTimeAverageData() throws Exception {
Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
assertEquals(data.size(), 1);
}


@Test
public void testRemoveNonExistBundleData()
throws PulsarAdminException, InterruptedException,
PulsarClientException, PulsarServerException, NoSuchFieldException, IllegalAccessException {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "remove-non-exist-bundle-data-ns";
final String topicName = tenant + "/" + namespace + "/" + "topic";
int bundleNumbers = 8;

admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();

// create a lot of topic to fully distributed among bundles.
for (int i = 0; i < 10; i++) {
String topicNameI = topicName + i;
admin1.topics().createPartitionedTopic(topicNameI, 20);
// trigger bundle assignment

pulsarClient.newConsumer().topic(topicNameI)
.subscriptionName("my-subscriber-name2").subscribe();
}

ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
ModularLoadManagerImpl lm1 = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
ModularLoadManagerWrapper loadManager2 = (ModularLoadManagerWrapper) pulsar2.getLoadManager().get();
ModularLoadManagerImpl lm2 = (ModularLoadManagerImpl) loadManager2.getLoadManager();

Field executors = lm1.getClass().getDeclaredField("executors");
executors.setAccessible(true);
ExecutorService executorService = (ExecutorService) executors.get(lm1);

assertEquals(lm1.getAvailableBrokers().size(), 2);

pulsar1.getBrokerService().updateRates();
pulsar2.getBrokerService().updateRates();

lm1.writeBrokerDataOnZooKeeper(true);
lm2.writeBrokerDataOnZooKeeper(true);

// wait for metadata store notification finish
CountDownLatch latch = new CountDownLatch(1);
executorService.submit(latch::countDown);
latch.await();

loadManagerWrapper.writeResourceQuotasToZooKeeper();

MetadataCache<BundleData> bundlesCache = pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class);

// trigger bundle split
String topicToFindBundle = topicName + 0;
NamespaceBundle bundleWillBeSplit = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));

String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/" + tenant + "/" + namespace;
CompletableFuture<List<String>> children = bundlesCache.getChildren(bundleDataPath);
List<String> bundles = children.join();
assertTrue(bundles.contains(bundleWillBeSplit.getBundleRange()));

// after updateAll no namespace bundle data is deleted from metadata store.
lm1.updateAll();

children = bundlesCache.getChildren(bundleDataPath);
bundles = children.join();
assertFalse(bundles.isEmpty());
assertEquals(bundleNumbers, bundles.size());

NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
pulsar1.getAdminClient().namespaces().splitNamespaceBundle(tenant + "/" + namespace,
bundleWillBeSplit.getBundleRange(),
false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);

NamespaceBundles allBundlesAfterSplit =
pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);

assertFalse(allBundlesAfterSplit.getBundles().contains(bundleWillBeSplit));

// the bundle data should be deleted

pulsar1.getBrokerService().updateRates();
pulsar2.getBrokerService().updateRates();

lm1.writeBrokerDataOnZooKeeper(true);
lm2.writeBrokerDataOnZooKeeper(true);

latch = new CountDownLatch(1);
// wait for metadata store notification finish
CountDownLatch finalLatch = latch;
executorService.submit(finalLatch::countDown);
latch.await();

loadManagerWrapper.writeResourceQuotasToZooKeeper();

lm1.updateAll();

log.info("update all triggered.");

// check bundle data should be deleted from metadata store.

CompletableFuture<List<String>> childrenAfterSplit = bundlesCache.getChildren(bundleDataPath);
List<String> bundlesAfterSplit = childrenAfterSplit.join();

assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}

}