Skip to content

Commit

Permalink
[fix][broker][branch-3.0] Do not try to clean owned bundles from inac…
Browse files Browse the repository at this point in the history
…tive source brokers (ExtensibleLoadManagerImpl only) (apache#23064) (apache#23077)

(cherry picked from commit 55d32f2)
  • Loading branch information
heesung-sn authored and nikhil-ctds committed Jul 29, 2024
1 parent 40a797c commit a34a936
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private static final Set<String> INTERNAL_TOPICS =
Set.of(BROKER_LOAD_DATA_STORE_TOPIC, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);

private PulsarService pulsar;

private ServiceConfiguration conf;
Expand Down Expand Up @@ -774,7 +777,8 @@ public void close() throws PulsarServerException {
}

public static boolean isInternalTopic(String topic) {
return topic.startsWith(TOPIC)
return INTERNAL_TOPICS.contains(topic)
|| topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
Expand Down Expand Up @@ -932,5 +936,26 @@ public void disableBroker() throws Exception {
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
// Close the internal topics (if owned any) after giving up the possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
}

private void closeInternalTopics() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String name : INTERNAL_TOPICS) {
futures.add(pulsar.getBrokerService().getTopicIfExists(name)
.thenAccept(topicOptional -> topicOptional.ifPresent(topic -> topic.close(true)))
.exceptionally(__ -> {
log.warn("Failed to close internal topic:{}", name);
return null;
}));
}
try {
FutureUtil.waitForAll(futures)
.get(pulsar.getConfiguration().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (Throwable e) {
log.warn("Failed to wait for closing internal topics", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,20 +611,13 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
}

private CompletableFuture<Void> publishOverrideEventAsync(String serviceUnit,
ServiceUnitStateData orphanData,
ServiceUnitStateData override) {
if (!validateChannelState(Started, true)) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
}
EventType eventType = EventType.Override;
eventCounters.get(eventType).getTotal().incrementAndGet();
return pubAsync(serviceUnit, override).whenComplete((__, e) -> {
if (e != null) {
eventCounters.get(eventType).getFailure().incrementAndGet();
log.error("Failed to override serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override, e);
}
}).thenApply(__ -> null);
return pubAsync(serviceUnit, override).thenApply(__ -> null);
}

public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
Expand Down Expand Up @@ -1257,45 +1250,51 @@ private void scheduleCleanup(String broker, long delayInSecs) {
broker, delayInSecs, cleanupJobs.size());
}


private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
Optional<String> selectedBroker,
String inactiveBroker) {


if (selectedBroker.isEmpty()) {
return new ServiceUnitStateData(Free, null, inactiveBroker,
true, getNextVersionId(orphanData));
}

if (orphanData.state() == Splitting) {
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker,
true, getNextVersionId(orphanData));
}
}

private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isEmpty()) {
log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}."
final var version = getNextVersionId(orphanData);
try {
selectBroker(serviceUnit, inactiveBroker)
.thenApply(selectedOpt ->
selectedOpt.map(selectedBroker -> {
if (orphanData.state() == Splitting) {
// if Splitting, set orphan.dstBroker() as dst to indicate where it was from.
// (The src broker runs handleSplitEvent.)
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
} else if (orphanData.state() == Owned) {
// if Owned, set orphan.dstBroker() as source to clean it up in case it is still
// alive.
return new ServiceUnitStateData(Owned, selectedBroker,
selectedBroker.equals(orphanData.dstBroker()) ? null :
orphanData.dstBroker(),
true, version);
} else {
// if Assigning or Releasing, set orphan.sourceBroker() as source
// to clean it up in case it is still alive.
return new ServiceUnitStateData(Owned, selectedBroker,
selectedBroker.equals(orphanData.sourceBroker()) ? null :
orphanData.sourceBroker(),
true, version);
}
// If no broker is selected(available), free the ownership.
// If the previous owner is still active, it will close the bundle(topic) ownership.
}).orElseGet(() -> new ServiceUnitStateData(Free, null,
orphanData.state() == Owned ? orphanData.dstBroker() : orphanData.sourceBroker(),
true,
version)))
.thenCompose(override -> {
log.info(
"Overriding inactiveBroker:{}, ownership serviceUnit:{} from orphanData:{} to "
+ "overrideData:{}",
inactiveBroker, serviceUnit, orphanData, override);
return publishOverrideEventAsync(serviceUnit, override);
}).get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (Throwable e) {
log.error(
"Failed to override inactiveBroker:{} ownership serviceUnit:{} orphanData:{}. "
+ "totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
inactiveBroker, serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet(), e);
}
var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
}

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
Expand Down Expand Up @@ -1411,61 +1410,14 @@ private synchronized void doCleanup(String broker) {

}

private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
Set.of(inactiveBroker), LookupOptions.builder().build())
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
}
return Optional.empty();
}

private Optional<ServiceUnitStateData> getRollForwardStateData(String serviceUnit,
String inactiveBroker,
long nextVersionId) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isEmpty()) {
return Optional.empty();
}
return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true, nextVersionId));
private CompletableFuture<Optional<String>> selectBroker(String serviceUnit, String inactiveBroker) {
return getLoadManager().selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
inactiveBroker == null ? Set.of() : Set.of(inactiveBroker),
LookupOptions.builder().build());
}


private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
String serviceUnit, ServiceUnitStateData orphanData,
Set<String> availableBrokers) {
long nextVersionId = getNextVersionId(orphanData);
var state = orphanData.state();
switch (state) {
case Assigning: {
return getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId);
}
case Splitting: {
return Optional.of(new ServiceUnitStateData(Splitting,
orphanData.dstBroker(), orphanData.sourceBroker(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, nextVersionId));
}
case Releasing: {
if (availableBrokers.contains(orphanData.sourceBroker())) {
// rollback to the src
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId));
} else {
return getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId);
}
}
default: {
var msg = String.format("Failed to get the overrideStateData from serviceUnit=%s, orphanData=%s",
serviceUnit, orphanData);
log.error(msg);
throw new IllegalStateException(msg);
}
}
}

@VisibleForTesting
protected void monitorOwnerships(List<String> brokers) {
if (!isChannelOwner()) {
Expand All @@ -1492,7 +1444,7 @@ protected void monitorOwnerships(List<String> brokers) {
long startTime = System.nanoTime();
Set<String> inactiveBrokers = new HashSet<>();
Set<String> activeBrokers = new HashSet<>(brokers);
Map<String, ServiceUnitStateData> orphanServiceUnits = new HashMap<>();
Map<String, ServiceUnitStateData> timedOutInFlightStateServiceUnits = new HashMap<>();
int serviceUnitTombstoneCleanupCnt = 0;
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
Expand All @@ -1504,20 +1456,27 @@ protected void monitorOwnerships(List<String> brokers) {
String srcBroker = stateData.sourceBroker();
var state = stateData.state();

if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
if (state == Owned && (StringUtils.isBlank(dstBroker) || !activeBrokers.contains(dstBroker))) {
inactiveBrokers.add(dstBroker);
continue;
}

if (isInFlightState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
inactiveBrokers.add(srcBroker);
continue;
}
if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
if (isInFlightState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
inactiveBrokers.add(dstBroker);
continue;
}
if (isActiveState(state) && isInFlightState(state)

if (isInFlightState(state)
&& now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
orphanServiceUnits.put(serviceUnit, stateData);
timedOutInFlightStateServiceUnits.put(serviceUnit, stateData);
continue;
}


if (!isActiveState(state) && now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) {
log.info("Found semi-terminal states to tombstone"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
Expand All @@ -1533,37 +1492,21 @@ protected void monitorOwnerships(List<String> brokers) {
}
}

// Skip cleaning orphan bundles if inactiveBrokers exist. This is a bigger problem.

if (!inactiveBrokers.isEmpty()) {
for (String inactiveBroker : inactiveBrokers) {
handleBrokerDeletionEvent(inactiveBroker);
}
} else if (!orphanServiceUnits.isEmpty()) {
for (var etr : orphanServiceUnits.entrySet()) {
}

// timedOutInFlightStateServiceUnits are the in-flight ones although their src and dst brokers are known to
// be active.
if (!timedOutInFlightStateServiceUnits.isEmpty()) {
for (var etr : timedOutInFlightStateServiceUnits.entrySet()) {
var orphanServiceUnit = etr.getKey();
var orphanData = etr.getValue();
var overrideData = getOverrideInFlightStateData(
orphanServiceUnit, orphanData, activeBrokers);
if (overrideData.isPresent()) {
log.info("Overriding in-flight state ownership serviceUnit:{} "
+ "from orphanData:{} to overrideData:{}",
orphanServiceUnit, orphanData, overrideData);
publishOverrideEventAsync(orphanServiceUnit, orphanData, overrideData.get())
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the ownership orphanServiceUnit:{}, orphanData:{}, "
+ "cleanupErrorCnt:{}.",
orphanServiceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
}
});
orphanServiceUnitCleanupCnt++;
} else {
log.warn("Failed get the overrideStateData from orphanServiceUnit:{}, orphanData:{},"
+ " cleanupErrorCnt:{}. will retry..",
orphanServiceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
}
overrideOwnership(orphanServiceUnit, orphanData, null);
orphanServiceUnitCleanupCnt++;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2294,6 +2294,15 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
topics.forEach((name, topicFuture) -> {
TopicName topicName = TopicName.get(name);
if (serviceUnit.includes(topicName)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
if (ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log)) {
log.info("[{}] Skip unloading ExtensibleLoadManager internal topics. Such internal topic "
+ "should be closed when shutting down the broker.", topicName);
}
return;
}

// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
if (topicFuture.isCompletedExceptionally()) {
Expand Down
Loading

0 comments on commit a34a936

Please sign in to comment.