diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index c22a4086a639d..1e519b3284fbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -667,7 +667,8 @@ public CompletableFuture> getOwnershipWithLookupDataA } public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, - Optional destinationBroker) { + Optional destinationBroker, + boolean force) { if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -686,7 +687,7 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, log.warn(msg); throw new IllegalArgumentException(msg); } - Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true); + Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force); UnloadDecision unloadDecision = new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin); return unloadAsync(unloadDecision, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 7c91cf927087f..069ac51655141 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -846,6 +846,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { Free, null, data.sourceBroker(), getNextVersionId(data)); unloadFuture = closeServiceUnit(serviceUnit, true); } + // If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE. stateChangeListeners.notifyOnCompletion(unloadFuture .thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, next)); @@ -866,9 +867,12 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { } if (isTargetBroker(data.sourceBroker())) { - stateChangeListeners.notifyOnCompletion( - data.force() ? closeServiceUnit(serviceUnit, true) - : CompletableFuture.completedFuture(0), serviceUnit, data) + // If data.force(), try closeServiceUnit and tombstone the bundle. + CompletableFuture future = + (data.force() ? closeServiceUnit(serviceUnit, true) + .thenCompose(__ -> tombstoneAsync(serviceUnit)) + : CompletableFuture.completedFuture(0)).thenApply(__ -> null); + stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } else { stateChangeListeners.notify(serviceUnit, data, null); @@ -880,9 +884,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted.")); } - stateChangeListeners.notify(serviceUnit, data, null); + if (isTargetBroker(data.sourceBroker())) { - log(null, serviceUnit, data, null); + stateChangeListeners.notifyOnCompletion( + tombstoneAsync(serviceUnit), serviceUnit, data) + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + } else { + stateChangeListeners.notify(serviceUnit, data, null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index 71ebbc92a87db..ac21e4c624163 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -97,7 +97,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable return; } switch (state) { - case Deleted, Owned, Init -> this.complete(serviceUnit, t); + case Init -> this.complete(serviceUnit, t); default -> { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {}", data, serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index ffae9475243da..6b745345c0a43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -175,7 +175,7 @@ public void beforeEvent(String serviceUnit, ServiceUnitStateData data) { public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { ServiceUnitState state = ServiceUnitStateData.state(data); - if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) { + if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) { if (log.isDebugEnabled()) { log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit); } @@ -195,7 +195,17 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable } switch (state) { - case Free, Owned -> complete(serviceUnit, t); + case Free -> { + if (!data.force()) { + complete(serviceUnit, t); + } + } + case Init -> { + if (data.force()) { + complete(serviceUnit, t); + } + } + case Owned -> complete(serviceUnit, t); case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit); case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 96936b3a5c05c..80559b736c6ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -837,7 +837,7 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, boolean closeWithoutWaitingClientDisconnect) { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { return ExtensibleLoadManagerImpl.get(loadManager.get()) - .unloadNamespaceBundleAsync(bundle, destinationBroker); + .unloadNamespaceBundleAsync(bundle, destinationBroker, false); } // unload namespace bundle OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); @@ -1286,7 +1286,8 @@ public CompletableFuture removeOwnedServiceUnitAsync(NamespaceBundle nsBun CompletableFuture future; if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty()); + future = extensibleLoadManager.unloadNamespaceBundleAsync( + nsBundle, Optional.empty(), true); } else { future = ownershipCache.removeOwnership(nsBundle); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 8b96ed04f64de..07855fda4d758 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -739,7 +739,8 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle) @Test(timeOut = 30 * 1000) public void testSplitBundleAdminAPI() throws Exception { - String namespace = defaultTestNamespace; + final String namespace = "public/testSplitBundleAdminAPI"; + admin.namespaces().createNamespace(namespace, 1); Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split"); TopicName topicName = topicAndBundle.getLeft(); admin.topics().createPartitionedTopic(topicName.toString(), 10); @@ -793,6 +794,30 @@ public boolean test(NamespaceBundle namespaceBundle) { } catch (PulsarAdminException ex) { assertTrue(ex.getMessage().contains("Invalid bundle range")); } + + + // delete and retry + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace); + }); + admin.namespaces().createNamespace(namespace, 1); + admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null); + + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + assertEquals(bundlesData.getNumBundles(), numBundles + 1); + String lowBundle = String.format("0x%08x", bundleRanges.get(0)); + String midBundle = String.format("0x%08x", mid); + String highBundle = String.format("0x%08x", bundleRanges.get(1)); + assertTrue(bundlesData.getBoundaries().contains(lowBundle)); + assertTrue(bundlesData.getBoundaries().contains(midBundle)); + assertTrue(bundlesData.getBoundaries().contains(highBundle)); + assertEquals(splitCount.get(), 2); + }); } @Test(timeOut = 30 * 1000) @@ -1656,7 +1681,11 @@ public void testTryAcquiringOwnership() NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()) .contains(namespaceEphemeralData.getNativeUrl())); - admin.namespaces().deleteNamespace(namespace); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace, true); + }); } @Test(timeOut = 30 * 1000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 1076f92037f10..c0fdd95a6a3db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -576,11 +576,11 @@ public void splitAndRetryTest() throws Exception { childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty())); channel1.publishSplitEventAsync(split); - waitUntilState(channel1, bundle, Deleted); - waitUntilState(channel2, bundle, Deleted); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); // Verify the retry count @@ -620,7 +620,7 @@ public void splitAndRetryTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 1, + 0, 0, 0, 0, @@ -1236,15 +1236,15 @@ public void splitTestWhenProducerFails() var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; - waitUntilStateWithMonitor(leader, bundle, Deleted); - waitUntilStateWithMonitor(channel1, bundle, Deleted); - waitUntilStateWithMonitor(channel2, bundle, Deleted); + waitUntilStateWithMonitor(leader, bundle, Init); + waitUntilStateWithMonitor(channel1, bundle, Init); + waitUntilStateWithMonitor(channel2, bundle, Init); var ownerAddr1 = channel1.getOwnerAsync(bundle); var ownerAddr2 = channel2.getOwnerAsync(bundle); - assertTrue(ownerAddr1.isCompletedExceptionally()); - assertTrue(ownerAddr2.isCompletedExceptionally()); + assertTrue(ownerAddr1.get().isEmpty()); + assertTrue(ownerAddr2.get().isEmpty()); FieldUtils.writeDeclaredField(channel1, @@ -1428,13 +1428,15 @@ public void splitAndRetryFailureTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; ((ServiceUnitStateChannelImpl) leader) .monitorOwnerships(List.of(brokerId1, brokerId2)); - waitUntilState(leader, bundle3, Deleted); - waitUntilState(channel1, bundle3, Deleted); - waitUntilState(channel2, bundle3, Deleted); + + waitUntilState(leader, bundle3, Init); + waitUntilState(channel1, bundle3, Init); + waitUntilState(channel2, bundle3, Init); - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0); + + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); @@ -1464,7 +1466,7 @@ public void splitAndRetryFailureTest() throws Exception { validateMonitorCounters(leader, 0, - 1, + 0, 1, 0, 0, @@ -1542,7 +1544,7 @@ public void testOverrideInactiveBrokerStateData() waitUntilNewOwner(channel2, ownedBundle, brokerId2); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); @@ -1605,7 +1607,7 @@ public void testOverrideOrphanStateData() waitUntilNewOwner(channel2, ownedBundle, broker); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(channel1, @@ -1663,8 +1665,10 @@ public void testActiveGetOwner() throws Exception { "inFlightStateWaitingTimeInMillis", 20 * 1000, true); start = System.currentTimeMillis(); assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty()); - assertTrue(System.currentTimeMillis() - start < 20_000); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); + assertTrue(System.currentTimeMillis() - start < 20_000); // simulate ownership cleanup(brokerId1 selected owner) by the leader channel overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java index 3287306ab48ba..57b7830214b92 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -123,40 +123,23 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null); - counterExpected.update(SplitDecision.Label.Success, Sessions); - assertEquals(inFlightUnloadRequests.size(), 0); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); + assertEquals(inFlightUnloadRequests.size(), 1); - // Success with Init state. - future = manager.waitAsync(CompletableFuture.completedFuture(null), - bundle, decision, 5, TimeUnit.SECONDS); - inFlightUnloadRequests = getinFlightUnloadRequests(manager); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); + + // Success with Init state. manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 0); counterExpected.update(SplitDecision.Label.Success, Sessions); assertEquals(counter.toMetrics(null).toString(), counterExpected.toMetrics(null).toString()); - future.get(); - // Success with Owned state. - future = manager.waitAsync(CompletableFuture.completedFuture(null), - bundle, decision, 5, TimeUnit.SECONDS); - inFlightUnloadRequests = getinFlightUnloadRequests(manager); - assertEquals(inFlightUnloadRequests.size(), 1); - manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); - assertEquals(inFlightUnloadRequests.size(), 0); - counterExpected.update(SplitDecision.Label.Success, Sessions); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); future.get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index 5d0abea33577b..f7deb072688c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -122,11 +122,15 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int assertEquals(inFlightUnloadRequestMap.size(), 1); manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, true, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); + // Success with Init state. manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1); @@ -136,17 +140,30 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int bundle, unloadDecision, 5, TimeUnit.SECONDS); inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); - future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2); + + // Success with Free state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, unloadDecision, 5, TimeUnit.SECONDS); + inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, true, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, false, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 0); + future.get(); + assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3); + + } @Test