Skip to content

Commit

Permalink
Avoid assign bundle to previous unloaded broker
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Jul 17, 2023
1 parent 57fbee4 commit f953f89
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ public class LoadData {
*/
private final Map<String, Long> recentlyUnloadedBundles;

/**
* Map from recently unloaded bundles to the broker they were unloaded from.
*/
private final Map<String, String> unloadedBundleToBroker;

/**
* Initialize a LoadData.
*/
public LoadData() {
this.brokerData = new ConcurrentHashMap<>();
this.bundleData = new ConcurrentHashMap<>();
this.recentlyUnloadedBundles = new ConcurrentHashMap<>();
this.unloadedBundleToBroker = new ConcurrentHashMap<>();
}

public Map<String, BrokerData> getBrokerData() {
Expand All @@ -72,4 +78,8 @@ public Map<String, BundleData> getBundleDataForLoadShedding() {
public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}

public Map<String, String> getUnloadedBundleToBroker() {
return unloadedBundleToBroker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ public synchronized void doLoadShedding() {
try {
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
loadData.getUnloadedBundleToBroker().put(bundle, broker);
} catch (PulsarServerException | PulsarAdminException e) {
log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e);
}
Expand Down Expand Up @@ -792,6 +793,7 @@ && shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
}

/**
* As leader broker, update bundle split metrics.
* As leader broker, update bundle split metrics.
*
* @param bundlesSplit the number of bundles splits
Expand Down Expand Up @@ -853,16 +855,9 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
brokerCandidateCache,
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);

// distribute bundles evenly to candidate-brokers if enable
if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(),
brokerCandidateCache,
brokerToNamespaceToBundleRange);
if (log.isDebugEnabled()) {
log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}",
brokerCandidateCache.size());
}
}
// Do not assign bundles to previous owners to avoid an infinite bundle unloading loop.
brokerCandidateCache.remove(loadData.getUnloadedBundleToBroker().getOrDefault(bundle, ""));

log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);

// Use the filter pipeline to finalize broker candidates.
Expand Down Expand Up @@ -926,6 +921,8 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
// Cleanup unloaded bundle cache.
loadData.getUnloadedBundleToBroker().remove(bundle);
return broker;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,4 +749,34 @@ public void testRemoveDeadBrokerTimeAverageData() throws Exception {
Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
assertEquals(data.size(), 1);
}

@Test
public void testAvoidAssignBundleToPreviousUnloadedBroker() throws Exception {
assertEquals(primaryLoadManager.getAvailableBrokers().size(), 2);
final LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");

Map<String, String> unloadedBundleToBroker = loadData.getUnloadedBundleToBroker();

final int totalBundles = 50;
final NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(nsFactory, "test", "test", "test1",
totalBundles);
final BundleData bundleData = new BundleData(10, 1000);
// it sets max topics under this bundle so, owner of this broker reaches max-topic threshold
bundleData.setTopics(pulsar1.getConfiguration().getLoadBalancerBrokerMaxTopics() + 10);
final TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000);
longTermMessageData.setMsgRateIn(1000);
bundleData.setLongTermData(longTermMessageData);
final String firstBundleDataPath = String.format("%s/%s", ModularLoadManagerImpl.BUNDLE_DATA_PATH, bundles[0]);
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath, bundleData).join();
String maxTopicOwnedBroker = primaryLoadManager.selectBrokerForAssignment(bundles[0]).get();

unloadedBundleToBroker.put(bundles[0].toString(), maxTopicOwnedBroker);

final Map<String, String> preallocatedBundleToBroker =
(Map<String, String>) getField(primaryLoadManager, "preallocatedBundleToBroker");
preallocatedBundleToBroker.clear();
assertNotEquals(maxTopicOwnedBroker, primaryLoadManager.selectBrokerForAssignment(bundles[0]).get());

assertTrue(unloadedBundleToBroker.isEmpty());
}
}

0 comments on commit f953f89

Please sign in to comment.