Skip to content

Commit

Permalink
[fix] [broker] Part-2: Replicator can not created successfully due to…
Browse files Browse the repository at this point in the history
… an orphan replicator in the previous topic owne
  • Loading branch information
poorbarcode committed Jan 22, 2024
1 parent b14fcb4 commit a0e6609
Showing 1 changed file with 35 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1413,8 +1413,6 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
List<CompletableFuture<Void>> futures = new ArrayList<>();
subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty())));
if (closeIfClientsConnected) {
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
}
FutureUtil.waitForAll(futures).thenRunAsync(() -> {
Expand Down Expand Up @@ -1458,16 +1456,18 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(PersistentTopic.this);
disconnectReplicators().thenAccept(__ -> {
brokerService.removeTopicFromCache(PersistentTopic.this);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

unregisterTopicPolicyListener();
unregisterTopicPolicyListener();

log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
});
}

@Override
Expand Down Expand Up @@ -1555,8 +1555,6 @@ public CompletableFuture<Void> close(
List<CompletableFuture<Void>> futures = new ArrayList<>();

futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
Expand Down Expand Up @@ -1604,22 +1602,26 @@ public CompletableFuture<Void> close(
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (disconnectClients) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
disconnectReplicators().thenAccept(__ -> {
if (disconnectClients) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
});
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (disconnectClients) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
disconnectReplicators().thenAccept(__ -> {
if (disconnectClients) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
});
}
}, null);
}).exceptionally(exception -> {
Expand All @@ -1632,6 +1634,18 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
return closeFuture;
}

private CompletableFuture<Void> disconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
return FutureUtil.waitForAll(futures).handle((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to close replicator, proceeding anyway.", topic, ex);
}
return null;
});
}

private void disposeTopic(CompletableFuture<?> closeFuture) {
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
Expand Down

0 comments on commit a0e6609

Please sign in to comment.