diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index fba0289367e7d..dc57a923c7adc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -122,6 +122,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; + private static final Set INTERNAL_TOPICS = + Set.of(BROKER_LOAD_DATA_STORE_TOPIC, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC); + private PulsarService pulsar; private ServiceConfiguration conf; @@ -774,7 +777,8 @@ public void close() throws PulsarServerException { } public static boolean isInternalTopic(String topic) { - return topic.startsWith(TOPIC) + return INTERNAL_TOPICS.contains(topic) + || topic.startsWith(TOPIC) || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC) || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); } @@ -932,5 +936,26 @@ public void disableBroker() throws Exception { serviceUnitStateChannel.cleanOwnerships(); leaderElectionService.close(); brokerRegistry.unregister(); + // Close the internal topics (if owned any) after giving up the possible leader role, + // so that the subsequent lookups could hit the next leader. + closeInternalTopics(); + } + + private void closeInternalTopics() { + List> futures = new ArrayList<>(); + for (String name : INTERNAL_TOPICS) { + futures.add(pulsar.getBrokerService().getTopicIfExists(name) + .thenAccept(topicOptional -> topicOptional.ifPresent(topic -> topic.close(true))) + .exceptionally(__ -> { + log.warn("Failed to close internal topic:{}", name); + return null; + })); + } + try { + FutureUtil.waitForAll(futures) + .get(pulsar.getConfiguration().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + } catch (Throwable e) { + log.warn("Failed to wait for closing internal topics", e); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 03c77033b0470..02e641f69a745 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -611,20 +611,13 @@ public CompletableFuture publishAssignEventAsync(String serviceUnit, Str } private CompletableFuture publishOverrideEventAsync(String serviceUnit, - ServiceUnitStateData orphanData, ServiceUnitStateData override) { if (!validateChannelState(Started, true)) { throw new IllegalStateException("Invalid channel state:" + channelState.name()); } EventType eventType = EventType.Override; eventCounters.get(eventType).getTotal().incrementAndGet(); - return pubAsync(serviceUnit, override).whenComplete((__, e) -> { - if (e != null) { - eventCounters.get(eventType).getFailure().incrementAndGet(); - log.error("Failed to override serviceUnit:{} from orphanData:{} to overrideData:{}", - serviceUnit, orphanData, override, e); - } - }).thenApply(__ -> null); + return pubAsync(serviceUnit, override).thenApply(__ -> null); } public CompletableFuture publishUnloadEventAsync(Unload unload) { @@ -1257,45 +1250,51 @@ private void scheduleCleanup(String broker, long delayInSecs) { broker, delayInSecs, cleanupJobs.size()); } - - private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData, - Optional selectedBroker, - String inactiveBroker) { - - - if (selectedBroker.isEmpty()) { - return new ServiceUnitStateData(Free, null, inactiveBroker, - true, getNextVersionId(orphanData)); - } - - if (orphanData.state() == Splitting) { - return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(), - Map.copyOf(orphanData.splitServiceUnitToDestBroker()), - true, getNextVersionId(orphanData)); - } else { - return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker, - true, getNextVersionId(orphanData)); - } - } - private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) { - Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); - if (selectedBroker.isEmpty()) { - log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}." + final var version = getNextVersionId(orphanData); + try { + selectBroker(serviceUnit, inactiveBroker) + .thenApply(selectedOpt -> + selectedOpt.map(selectedBroker -> { + if (orphanData.state() == Splitting) { + // if Splitting, set orphan.dstBroker() as dst to indicate where it was from. + // (The src broker runs handleSplitEvent.) + return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker, + Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version); + } else if (orphanData.state() == Owned) { + // if Owned, set orphan.dstBroker() as source to clean it up in case it is still + // alive. + return new ServiceUnitStateData(Owned, selectedBroker, + selectedBroker.equals(orphanData.dstBroker()) ? null : + orphanData.dstBroker(), + true, version); + } else { + // if Assigning or Releasing, set orphan.sourceBroker() as source + // to clean it up in case it is still alive. + return new ServiceUnitStateData(Owned, selectedBroker, + selectedBroker.equals(orphanData.sourceBroker()) ? null : + orphanData.sourceBroker(), + true, version); + } + // If no broker is selected(available), free the ownership. + // If the previous owner is still active, it will close the bundle(topic) ownership. + }).orElseGet(() -> new ServiceUnitStateData(Free, null, + orphanData.state() == Owned ? orphanData.dstBroker() : orphanData.sourceBroker(), + true, + version))) + .thenCompose(override -> { + log.info( + "Overriding inactiveBroker:{}, ownership serviceUnit:{} from orphanData:{} to " + + "overrideData:{}", + inactiveBroker, serviceUnit, orphanData, override); + return publishOverrideEventAsync(serviceUnit, override); + }).get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (Throwable e) { + log.error( + "Failed to override inactiveBroker:{} ownership serviceUnit:{} orphanData:{}. " + "totalCleanupErrorCnt:{}", - serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); + inactiveBroker, serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet(), e); } - var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker); - log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", - serviceUnit, orphanData, override); - publishOverrideEventAsync(serviceUnit, orphanData, override) - .exceptionally(e -> { - log.error( - "Failed to override the ownership serviceUnit:{} orphanData:{}. " - + "Failed to publish override event. totalCleanupErrorCnt:{}", - serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); - return null; - }); } private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) { @@ -1411,61 +1410,14 @@ private synchronized void doCleanup(String broker) { } - private Optional selectBroker(String serviceUnit, String inactiveBroker) { - try { - return loadManager.selectAsync( - LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), - Set.of(inactiveBroker), LookupOptions.builder().build()) - .get(inFlightStateWaitingTimeInMillis, MILLISECONDS); - } catch (Throwable e) { - log.error("Failed to select a broker for serviceUnit:{}", serviceUnit); - } - return Optional.empty(); - } - - private Optional getRollForwardStateData(String serviceUnit, - String inactiveBroker, - long nextVersionId) { - Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); - if (selectedBroker.isEmpty()) { - return Optional.empty(); - } - return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true, nextVersionId)); + private CompletableFuture> selectBroker(String serviceUnit, String inactiveBroker) { + return getLoadManager().selectAsync( + LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), + inactiveBroker == null ? Set.of() : Set.of(inactiveBroker), + LookupOptions.builder().build()); } - private Optional getOverrideInFlightStateData( - String serviceUnit, ServiceUnitStateData orphanData, - Set availableBrokers) { - long nextVersionId = getNextVersionId(orphanData); - var state = orphanData.state(); - switch (state) { - case Assigning: { - return getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId); - } - case Splitting: { - return Optional.of(new ServiceUnitStateData(Splitting, - orphanData.dstBroker(), orphanData.sourceBroker(), - Map.copyOf(orphanData.splitServiceUnitToDestBroker()), - true, nextVersionId)); - } - case Releasing: { - if (availableBrokers.contains(orphanData.sourceBroker())) { - // rollback to the src - return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId)); - } else { - return getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId); - } - } - default: { - var msg = String.format("Failed to get the overrideStateData from serviceUnit=%s, orphanData=%s", - serviceUnit, orphanData); - log.error(msg); - throw new IllegalStateException(msg); - } - } - } - @VisibleForTesting protected void monitorOwnerships(List brokers) { if (!isChannelOwner()) { @@ -1492,7 +1444,7 @@ protected void monitorOwnerships(List brokers) { long startTime = System.nanoTime(); Set inactiveBrokers = new HashSet<>(); Set activeBrokers = new HashSet<>(brokers); - Map orphanServiceUnits = new HashMap<>(); + Map timedOutInFlightStateServiceUnits = new HashMap<>(); int serviceUnitTombstoneCleanupCnt = 0; int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); @@ -1504,20 +1456,27 @@ protected void monitorOwnerships(List brokers) { String srcBroker = stateData.sourceBroker(); var state = stateData.state(); - if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) { + if (state == Owned && (StringUtils.isBlank(dstBroker) || !activeBrokers.contains(dstBroker))) { + inactiveBrokers.add(dstBroker); + continue; + } + + if (isInFlightState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) { inactiveBrokers.add(srcBroker); continue; } - if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) { + if (isInFlightState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) { inactiveBrokers.add(dstBroker); continue; } - if (isActiveState(state) && isInFlightState(state) + + if (isInFlightState(state) && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) { - orphanServiceUnits.put(serviceUnit, stateData); + timedOutInFlightStateServiceUnits.put(serviceUnit, stateData); continue; } + if (!isActiveState(state) && now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) { log.info("Found semi-terminal states to tombstone" + " serviceUnit:{}, stateData:{}", serviceUnit, stateData); @@ -1533,37 +1492,21 @@ protected void monitorOwnerships(List brokers) { } } - // Skip cleaning orphan bundles if inactiveBrokers exist. This is a bigger problem. + if (!inactiveBrokers.isEmpty()) { for (String inactiveBroker : inactiveBrokers) { handleBrokerDeletionEvent(inactiveBroker); } - } else if (!orphanServiceUnits.isEmpty()) { - for (var etr : orphanServiceUnits.entrySet()) { + } + + // timedOutInFlightStateServiceUnits are the in-flight ones although their src and dst brokers are known to + // be active. + if (!timedOutInFlightStateServiceUnits.isEmpty()) { + for (var etr : timedOutInFlightStateServiceUnits.entrySet()) { var orphanServiceUnit = etr.getKey(); var orphanData = etr.getValue(); - var overrideData = getOverrideInFlightStateData( - orphanServiceUnit, orphanData, activeBrokers); - if (overrideData.isPresent()) { - log.info("Overriding in-flight state ownership serviceUnit:{} " - + "from orphanData:{} to overrideData:{}", - orphanServiceUnit, orphanData, overrideData); - publishOverrideEventAsync(orphanServiceUnit, orphanData, overrideData.get()) - .whenComplete((__, e) -> { - if (e != null) { - log.error("Failed cleaning the ownership orphanServiceUnit:{}, orphanData:{}, " - + "cleanupErrorCnt:{}.", - orphanServiceUnit, orphanData, - totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e); - } - }); - orphanServiceUnitCleanupCnt++; - } else { - log.warn("Failed get the overrideStateData from orphanServiceUnit:{}, orphanData:{}," - + " cleanupErrorCnt:{}. will retry..", - orphanServiceUnit, orphanData, - totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart); - } + overrideOwnership(orphanServiceUnit, orphanData, null); + orphanServiceUnitCleanupCnt++; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index de0750b695a94..ca3c9834be0f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2309,6 +2309,15 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit topics.forEach((name, topicFuture) -> { TopicName topicName = TopicName.get(name); if (serviceUnit.includes(topicName)) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar) + && ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) { + if (ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log)) { + log.info("[{}] Skip unloading ExtensibleLoadManager internal topics. Such internal topic " + + "should be closed when shutting down the broker.", topicName); + } + return; + } + // Topic needs to be unloaded log.info("[{}] Unloading topic", topicName); if (topicFuture.isCompletedExceptionally()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index df0dc7eff1b78..24f27a58e761e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -108,6 +108,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { private ServiceUnitStateChannel channel2; private String brokerId1; private String brokerId2; + private String brokerId3; private String bundle; private String bundle1; private String bundle2; @@ -159,6 +160,7 @@ protected void setup() throws Exception { FieldUtils.readDeclaredField(channel1, "brokerId", true); brokerId2 = (String) FieldUtils.readDeclaredField(channel2, "brokerId", true); + brokerId3 = "broker-3"; bundle = "public/default/0x00000000_0xffffffff"; bundle1 = "public/default/0x00000000_0xfffffff0"; @@ -1232,7 +1234,8 @@ public void splitTestWhenProducerFails() var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; - + doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1))) + .when(loadManager).selectAsync(any(), any(), any()); waitUntilStateWithMonitor(leader, bundle, Init); waitUntilStateWithMonitor(channel1, bundle, Init); waitUntilStateWithMonitor(channel2, bundle, Init); @@ -1423,6 +1426,8 @@ public void splitAndRetryFailureTest() throws Exception { assertEquals(3, count.get()); }); var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; + doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1))) + .when(loadManager).selectAsync(any(), any(), any()); ((ServiceUnitStateChannelImpl) leader) .monitorOwnerships(List.of(brokerId1, brokerId2)); @@ -1566,26 +1571,40 @@ public void testOverrideOrphanStateData() String broker = brokerId1; // test override states - String releasingBundle = "public/releasing/0xfffffff0_0xffffffff"; + String releasingBundle1 = "public/releasing1/0xfffffff0_0xffffffff"; + String releasingBundle2 = "public/releasing2/0xfffffff0_0xffffffff"; String splittingBundle = bundle; - String assigningBundle = "public/assigning/0xfffffff0_0xffffffff"; + String assigningBundle1 = "public/assigning1/0xfffffff0_0xffffffff"; + String assigningBundle2 = "public/assigning2/0xfffffff0_0xffffffff"; String freeBundle = "public/free/0xfffffff0_0xffffffff"; String deletedBundle = "public/deleted/0xfffffff0_0xffffffff"; - String ownedBundle = "public/owned/0xfffffff0_0xffffffff"; - overrideTableViews(releasingBundle, - new ServiceUnitStateData(Releasing, null, broker, 1)); + String ownedBundle1 = "public/owned1/0xfffffff0_0xffffffff"; + String ownedBundle2 = "public/owned2SourceBundle/0xfffffff0_0xffffffff"; + String ownedBundle3 = "public/owned3/0xfffffff0_0xffffffff"; + String inactiveBroker = "broker-inactive-1"; + overrideTableViews(releasingBundle1, + new ServiceUnitStateData(Releasing, broker, brokerId2, 1)); + overrideTableViews(releasingBundle2, + new ServiceUnitStateData(Releasing, brokerId2, brokerId3, 1)); overrideTableViews(splittingBundle, new ServiceUnitStateData(Splitting, null, broker, Map.of(childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()), 1)); - overrideTableViews(assigningBundle, + overrideTableViews(assigningBundle1, new ServiceUnitStateData(Assigning, broker, null, 1)); + overrideTableViews(assigningBundle2, + new ServiceUnitStateData(Assigning, broker, brokerId2, 1)); overrideTableViews(freeBundle, new ServiceUnitStateData(Free, null, broker, 1)); overrideTableViews(deletedBundle, new ServiceUnitStateData(Deleted, null, broker, 1)); - overrideTableViews(ownedBundle, + overrideTableViews(ownedBundle1, new ServiceUnitStateData(Owned, broker, null, 1)); + overrideTableViews(ownedBundle2, + new ServiceUnitStateData(Owned, broker, inactiveBroker, 1)); + overrideTableViews(ownedBundle3, + new ServiceUnitStateData(Owned, inactiveBroker, broker, 1)); + // test stable metadata state doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2))) @@ -1595,16 +1614,33 @@ public void testOverrideOrphanStateData() FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis", -1, true); ((ServiceUnitStateChannelImpl) leaderChannel) - .monitorOwnerships(List.of(brokerId1, brokerId2)); + .monitorOwnerships(List.of(brokerId1, brokerId2, "broker-3")); - waitUntilNewOwner(channel2, releasingBundle, broker); - waitUntilNewOwner(channel2, childBundle11, broker); - waitUntilNewOwner(channel2, childBundle12, broker); - waitUntilNewOwner(channel2, assigningBundle, brokerId2); - waitUntilNewOwner(channel2, ownedBundle, broker); - assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); - assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); + ServiceUnitStateChannel finalLeaderChannel = leaderChannel; + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> getCleanupJobs(finalLeaderChannel).isEmpty()); + + + waitUntilNewOwner(channel2, releasingBundle1, brokerId2); + waitUntilNewOwner(channel2, releasingBundle2, brokerId2); assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); + waitUntilNewOwner(channel2, childBundle11, brokerId2); + waitUntilNewOwner(channel2, childBundle12, brokerId2); + waitUntilNewOwner(channel2, assigningBundle1, brokerId2); + waitUntilNewOwner(channel2, assigningBundle2, brokerId2); + assertTrue(channel2.getOwnerAsync(freeBundle).get().isEmpty()); + assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); + waitUntilNewOwner(channel2, ownedBundle1, broker); + waitUntilNewOwner(channel2, ownedBundle2, broker); + waitUntilNewOwner(channel2, ownedBundle3, brokerId2); + + validateMonitorCounters(leaderChannel, + 1, + 0, + 6, + 0, + 1, + 0, + 0); // clean-up FieldUtils.writeDeclaredField(channel1,