From d7ffdfcd08e34bdfdbcbe42b2b1174af0125bbf5 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Sat, 18 May 2024 06:12:13 -0700 Subject: [PATCH] [fix][broker] Tombsotne Deleted and Free state bundles --- .../channel/ServiceUnitStateChannelImpl.java | 28 +++++---- .../extensions/manager/UnloadManager.java | 4 +- .../ExtensibleLoadManagerImplTest.java | 39 ++++++++++-- .../channel/ServiceUnitStateChannelTest.java | 59 ++++++++++--------- .../extensions/manager/UnloadManagerTest.java | 5 +- 5 files changed, 86 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 9821ce56420ed..fa8c914a8b50b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -834,19 +834,17 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) { private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.sourceBroker())) { ServiceUnitStateData next; - CompletableFuture unloadFuture; if (isTransferCommand(data)) { next = new ServiceUnitStateData( Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data)); - // If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE. - var disconnectClients = !pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload(); - unloadFuture = closeServiceUnit(serviceUnit, disconnectClients); + } else { next = new ServiceUnitStateData( Free, null, data.sourceBroker(), getNextVersionId(data)); - unloadFuture = closeServiceUnit(serviceUnit, true); } - stateChangeListeners.notifyOnCompletion(unloadFuture + var disconnectClients = !pulsar.getConfig().isLoadBalancerMultiPhaseBundleUnload(); + // If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE. + stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit, disconnectClients) .thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, next)); } @@ -866,9 +864,13 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { } if (isTargetBroker(data.sourceBroker())) { - stateChangeListeners.notifyOnCompletion( - data.force() ? closeServiceUnit(serviceUnit, true) - : CompletableFuture.completedFuture(0), serviceUnit, data) + CompletableFuture unloadFuture = closeServiceUnit(serviceUnit, true); + // If data.force() is true, it means that this Free state is from the orphan cleanup job, where + // the source broker is likely unavailable. In this case, we don't tombstone it immediately. + CompletableFuture future = + (data.force() ? unloadFuture + : unloadFuture.thenCompose(__ -> tombstoneAsync(serviceUnit))).thenApply(__ -> null); + stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } else { stateChangeListeners.notify(serviceUnit, data, null); @@ -880,9 +882,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted.")); } - stateChangeListeners.notify(serviceUnit, data, null); + if (isTargetBroker(data.sourceBroker())) { - log(null, serviceUnit, data, null); + stateChangeListeners.notifyOnCompletion( + tombstoneAsync(serviceUnit), serviceUnit, data) + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + } else { + stateChangeListeners.notify(serviceUnit, data, null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index ffae9475243da..991263527bae9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -175,7 +175,7 @@ public void beforeEvent(String serviceUnit, ServiceUnitStateData data) { public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { ServiceUnitState state = ServiceUnitStateData.state(data); - if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) { + if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) { if (log.isDebugEnabled()) { log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit); } @@ -195,7 +195,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable } switch (state) { - case Free, Owned -> complete(serviceUnit, t); + case Init, Owned -> complete(serviceUnit, t); case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit); case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 8b96ed04f64de..69acedc2273d3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -375,7 +375,7 @@ public boolean test(NamespaceBundle namespaceBundle) { assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); Awaitility.await().untilAsserted(() -> { assertEquals(onloadCount.get(), 1); - assertEquals(unloadCount.get(), 1); + assertEquals(unloadCount.get(), 2); //one from releasing and one from Free }); broker = admin.lookups().lookupTopic(topicName.toString()); @@ -385,7 +385,7 @@ public boolean test(NamespaceBundle namespaceBundle) { Awaitility.await().untilAsserted(() -> { checkOwnershipState(finalBroker, bundle); assertEquals(onloadCount.get(), 2); - assertEquals(unloadCount.get(), 1); + assertEquals(unloadCount.get(), 2); }); @@ -402,7 +402,7 @@ public boolean test(NamespaceBundle namespaceBundle) { admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl); Awaitility.await().untilAsserted(() -> { assertEquals(onloadCount.get(), 3); - assertEquals(unloadCount.get(), 3); //one from releasing and one from owned + assertEquals(unloadCount.get(), 4); //one from releasing and one from owned }); assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl); @@ -739,7 +739,8 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle) @Test(timeOut = 30 * 1000) public void testSplitBundleAdminAPI() throws Exception { - String namespace = defaultTestNamespace; + final String namespace = "public/testSplitBundleAdminAPI"; + admin.namespaces().createNamespace(namespace, 1); Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split"); TopicName topicName = topicAndBundle.getLeft(); admin.topics().createPartitionedTopic(topicName.toString(), 10); @@ -793,6 +794,30 @@ public boolean test(NamespaceBundle namespaceBundle) { } catch (PulsarAdminException ex) { assertTrue(ex.getMessage().contains("Invalid bundle range")); } + + + // delete and retry + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace); + }); + admin.namespaces().createNamespace(namespace, 1); + admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null); + + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + assertEquals(bundlesData.getNumBundles(), numBundles + 1); + String lowBundle = String.format("0x%08x", bundleRanges.get(0)); + String midBundle = String.format("0x%08x", mid); + String highBundle = String.format("0x%08x", bundleRanges.get(1)); + assertTrue(bundlesData.getBoundaries().contains(lowBundle)); + assertTrue(bundlesData.getBoundaries().contains(midBundle)); + assertTrue(bundlesData.getBoundaries().contains(highBundle)); + assertEquals(splitCount.get(), 2); + }); } @Test(timeOut = 30 * 1000) @@ -1656,7 +1681,11 @@ public void testTryAcquiringOwnership() NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()) .contains(namespaceEphemeralData.getNativeUrl())); - admin.namespaces().deleteNamespace(namespace); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace, true); + }); } @Test(timeOut = 30 * 1000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 1076f92037f10..b557f0e484e2a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -576,11 +576,11 @@ public void splitAndRetryTest() throws Exception { childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty())); channel1.publishSplitEventAsync(split); - waitUntilState(channel1, bundle, Deleted); - waitUntilState(channel2, bundle, Deleted); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); // Verify the retry count @@ -620,7 +620,7 @@ public void splitAndRetryTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 1, + 0, 0, 0, 0, @@ -1032,8 +1032,8 @@ public void unloadTest() channel1.publishUnloadEventAsync(unload); - waitUntilState(channel1, bundle, Free); - waitUntilState(channel2, bundle, Free); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); @@ -1054,10 +1054,10 @@ public void unloadTest() channel2.publishUnloadEventAsync(unload2); - waitUntilState(channel1, bundle, Free); - waitUntilState(channel2, bundle, Free); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); - // test monitor if Free -> Init + // test monitor if Init -> Init FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1 , true); FieldUtils.writeDeclaredField(channel1, @@ -1078,7 +1078,7 @@ public void unloadTest() var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 1, + 0, 0, 0, 0, @@ -1105,8 +1105,8 @@ public void assignTestWhenDestBrokerProducerFails() channel1.publishUnloadEventAsync(unload); - waitUntilState(channel1, bundle, Free); - waitUntilState(channel2, bundle, Free); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get()); assertEquals(Optional.empty(), channel2.getOwnerAsync(bundle).get()); @@ -1188,8 +1188,8 @@ public void splitTestWhenProducerFails() channel1.publishUnloadEventAsync(unload); - waitUntilState(channel1, bundle, Free); - waitUntilState(channel2, bundle, Free); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); channel1.publishAssignEventAsync(bundle, brokerId1); @@ -1236,15 +1236,15 @@ public void splitTestWhenProducerFails() var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; - waitUntilStateWithMonitor(leader, bundle, Deleted); - waitUntilStateWithMonitor(channel1, bundle, Deleted); - waitUntilStateWithMonitor(channel2, bundle, Deleted); + waitUntilStateWithMonitor(leader, bundle, Init); + waitUntilStateWithMonitor(channel1, bundle, Init); + waitUntilStateWithMonitor(channel2, bundle, Init); var ownerAddr1 = channel1.getOwnerAsync(bundle); var ownerAddr2 = channel2.getOwnerAsync(bundle); - assertTrue(ownerAddr1.isCompletedExceptionally()); - assertTrue(ownerAddr2.isCompletedExceptionally()); + assertTrue(ownerAddr1.get().isEmpty()); + assertTrue(ownerAddr2.get().isEmpty()); FieldUtils.writeDeclaredField(channel1, @@ -1428,13 +1428,15 @@ public void splitAndRetryFailureTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; ((ServiceUnitStateChannelImpl) leader) .monitorOwnerships(List.of(brokerId1, brokerId2)); - waitUntilState(leader, bundle3, Deleted); - waitUntilState(channel1, bundle3, Deleted); - waitUntilState(channel2, bundle3, Deleted); + + waitUntilState(leader, bundle3, Init); + waitUntilState(channel1, bundle3, Init); + waitUntilState(channel2, bundle3, Init); + - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0); + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); @@ -1464,7 +1466,7 @@ public void splitAndRetryFailureTest() throws Exception { validateMonitorCounters(leader, 0, - 1, + 0, 1, 0, 0, @@ -1542,7 +1544,7 @@ public void testOverrideInactiveBrokerStateData() waitUntilNewOwner(channel2, ownedBundle, brokerId2); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); @@ -1605,7 +1607,7 @@ public void testOverrideOrphanStateData() waitUntilNewOwner(channel2, ownedBundle, broker); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(channel1, @@ -1664,7 +1666,6 @@ public void testActiveGetOwner() throws Exception { start = System.currentTimeMillis(); assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty()); assertTrue(System.currentTimeMillis() - start < 20_000); - // simulate ownership cleanup(brokerId1 selected owner) by the leader channel overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index 5d0abea33577b..0fcc1d12e5837 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -122,12 +122,13 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int assertEquals(inFlightUnloadRequestMap.size(), 1); manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); + future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);