diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a375ebf2809eda..1ba7507024d8c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1413,8 +1413,6 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, List> futures = new ArrayList<>(); subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty()))); if (closeIfClientsConnected) { - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); } FutureUtil.waitForAll(futures).thenRunAsync(() -> { @@ -1458,16 +1456,18 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); + disconnectReplicators().thenAccept(__ -> { + brokerService.removeTopicFromCache(PersistentTopic.this); - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - unregisterTopicPolicyListener(); + unregisterTopicPolicyListener(); - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + }); } @Override @@ -1555,8 +1555,6 @@ public CompletableFuture close( List> futures = new ArrayList<>(); futures.add(transactionBuffer.closeAsync()); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { @@ -1604,22 +1602,26 @@ public CompletableFuture close( ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { - if (disconnectClients) { - // Everything is now closed, remove the topic from map - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } + disconnectReplicators().thenAccept(__ -> { + if (disconnectClients) { + // Everything is now closed, remove the topic from map + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } + }); } @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - if (disconnectClients) { - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } + disconnectReplicators().thenAccept(__ -> { + if (disconnectClients) { + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } + }); } }, null); }).exceptionally(exception -> { @@ -1632,6 +1634,18 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } + private CompletableFuture disconnectReplicators() { + List> futures = new ArrayList<>(); + replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + return FutureUtil.waitForAll(futures).handle((__, ex) -> { + if (ex != null) { + log.error("[{}] Failed to close replicator, proceeding anyway.", topic, ex); + } + return null; + }); + } + private void disposeTopic(CompletableFuture closeFuture) { brokerService.removeTopicFromCache(PersistentTopic.this) .thenRun(() -> {