From 4dcd4ac41492157374e002977ca34acc29a28c2d Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Mon, 20 May 2024 06:33:51 -0700 Subject: [PATCH] updated the tombstone logic --- .../extensions/ExtensibleLoadManagerImpl.java | 5 ++-- .../channel/ServiceUnitStateChannelImpl.java | 6 ++-- .../extensions/manager/UnloadManager.java | 12 +++++++- .../broker/namespace/NamespaceService.java | 5 ++-- .../channel/ServiceUnitStateChannelTest.java | 23 ++++++++------- .../extensions/manager/UnloadManagerTest.java | 28 +++++++++++++++---- 6 files changed, 55 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index c22a4086a639d..1e519b3284fbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -667,7 +667,8 @@ public CompletableFuture> getOwnershipWithLookupDataA } public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, - Optional destinationBroker) { + Optional destinationBroker, + boolean force) { if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -686,7 +687,7 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, log.warn(msg); throw new IllegalArgumentException(msg); } - Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true); + Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force); UnloadDecision unloadDecision = new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin); return unloadAsync(unloadDecision, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 74e88ab15ff9b..77ab6994b6183 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -867,11 +867,11 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { } if (isTargetBroker(data.sourceBroker())) { - // If data.force() is true, it means that this Free state is from the orphan cleanup job, where - // the source broker is likely unavailable. In this case, we don't tombstone it immediately. + // If data.force(), try closeServiceUnit and tombstone the bundle. CompletableFuture future = (data.force() ? closeServiceUnit(serviceUnit, true) - : tombstoneAsync(serviceUnit)).thenApply(__ -> null); + .thenCompose(__ -> tombstoneAsync(serviceUnit)) + : CompletableFuture.completedFuture(0)).thenApply(__ -> null); stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index 991263527bae9..6b745345c0a43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -195,7 +195,17 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable } switch (state) { - case Init, Owned -> complete(serviceUnit, t); + case Free -> { + if (!data.force()) { + complete(serviceUnit, t); + } + } + case Init -> { + if (data.force()) { + complete(serviceUnit, t); + } + } + case Owned -> complete(serviceUnit, t); case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit); case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 96936b3a5c05c..80559b736c6ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -837,7 +837,7 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, boolean closeWithoutWaitingClientDisconnect) { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { return ExtensibleLoadManagerImpl.get(loadManager.get()) - .unloadNamespaceBundleAsync(bundle, destinationBroker); + .unloadNamespaceBundleAsync(bundle, destinationBroker, false); } // unload namespace bundle OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); @@ -1286,7 +1286,8 @@ public CompletableFuture removeOwnedServiceUnitAsync(NamespaceBundle nsBun CompletableFuture future; if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty()); + future = extensibleLoadManager.unloadNamespaceBundleAsync( + nsBundle, Optional.empty(), true); } else { future = ownershipCache.removeOwnership(nsBundle); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index b557f0e484e2a..c0fdd95a6a3db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -1032,8 +1032,8 @@ public void unloadTest() channel1.publishUnloadEventAsync(unload); - waitUntilState(channel1, bundle, Init); - waitUntilState(channel2, bundle, Init); + waitUntilState(channel1, bundle, Free); + waitUntilState(channel2, bundle, Free); var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); @@ -1054,10 +1054,10 @@ public void unloadTest() channel2.publishUnloadEventAsync(unload2); - waitUntilState(channel1, bundle, Init); - waitUntilState(channel2, bundle, Init); + waitUntilState(channel1, bundle, Free); + waitUntilState(channel2, bundle, Free); - // test monitor if Init -> Init + // test monitor if Free -> Init FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1 , true); FieldUtils.writeDeclaredField(channel1, @@ -1078,7 +1078,7 @@ public void unloadTest() var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 0, + 1, 0, 0, 0, @@ -1105,8 +1105,8 @@ public void assignTestWhenDestBrokerProducerFails() channel1.publishUnloadEventAsync(unload); - waitUntilState(channel1, bundle, Init); - waitUntilState(channel2, bundle, Init); + waitUntilState(channel1, bundle, Free); + waitUntilState(channel2, bundle, Free); assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get()); assertEquals(Optional.empty(), channel2.getOwnerAsync(bundle).get()); @@ -1188,8 +1188,8 @@ public void splitTestWhenProducerFails() channel1.publishUnloadEventAsync(unload); - waitUntilState(channel1, bundle, Init); - waitUntilState(channel2, bundle, Init); + waitUntilState(channel1, bundle, Free); + waitUntilState(channel2, bundle, Free); channel1.publishAssignEventAsync(bundle, brokerId1); @@ -1665,6 +1665,9 @@ public void testActiveGetOwner() throws Exception { "inFlightStateWaitingTimeInMillis", 20 * 1000, true); start = System.currentTimeMillis(); assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty()); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); + assertTrue(System.currentTimeMillis() - start < 20_000); // simulate ownership cleanup(brokerId1 selected owner) by the leader channel overrideTableViews(bundle, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index 0fcc1d12e5837..f7deb072688c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -122,13 +122,16 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int assertEquals(inFlightUnloadRequestMap.size(), 1); manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, true, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); + // Success with Init state. manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); - future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1); @@ -137,17 +140,30 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int bundle, unloadDecision, 5, TimeUnit.SECONDS); inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); - future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2); + + // Success with Free state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, unloadDecision, 5, TimeUnit.SECONDS); + inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, true, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, false, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 0); + future.get(); + assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3); + + } @Test