Skip to content

Commit

Permalink
[fix][ml] Make mlOwnershipChecker asynchronous so that it doesn't blo…
Browse files Browse the repository at this point in the history
…ck/deadlock threads (apache#21333)
  • Loading branch information
lhotari authored and vinayakmalik12 committed Oct 12, 2023
1 parent d45a00d commit 68bdfa9
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ ManagedLedger open(String name, ManagedLedgerConfig config)
* opaque context
*/
void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback,
Supplier<Boolean> mlOwnershipChecker, Object ctx);
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, Object ctx);

/**
* Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2685,32 +2685,47 @@ public void operationComplete(Void result, Stat stat) {
}

@Override
public void operationFailed(MetaStoreException e) {
if (e instanceof MetaStoreException.BadVersionException) {
public void operationFailed(MetaStoreException topLevelException) {
if (topLevelException instanceof MetaStoreException.BadVersionException) {
log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}",
ledger.name, name, e.getMessage());
ledger.name, name, topLevelException.getMessage());
// it means previous owner of the ml might have updated the version incorrectly. So, check
// the ownership and refresh the version again.
if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
updateCursorLedgerStat(info, stat);
}

@Override
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed to refresh cursor metadata-version for {} due "
+ "to {}", ledger.name, name, e.getMessage());
}
}
});
if (ledger.mlOwnershipChecker != null) {
ledger.mlOwnershipChecker.get().whenComplete((hasOwnership, t) -> {
if (t == null && hasOwnership) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
updateCursorLedgerStat(info, stat);
// fail the top level call so that the caller can retry
callback.operationFailed(topLevelException);
}

@Override
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed to refresh cursor metadata-version "
+ "for {} due to {}", ledger.name, name,
e.getMessage());
}
// fail the top level call so that the caller can retry
callback.operationFailed(topLevelException);
}
});
} else {
// fail the top level call so that the caller can retry
callback.operationFailed(topLevelException);
}
});
} else {
callback.operationFailed(topLevelException);
}
} else {
callback.operationFailed(topLevelException);
}
callback.operationFailed(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx) {

@Override
public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback,
Supplier<Boolean> mlOwnershipChecker, final Object ctx) {
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, final Object ctx) {
if (closed) {
callback.openLedgerFailed(new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;
protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker;

volatile PositionImpl lastConfirmedEntry;

Expand Down Expand Up @@ -336,7 +336,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
}
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
final String name, final Supplier<Boolean> mlOwnershipChecker) {
final String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
this.factory = factory;
this.bookKeeper = bookKeeper;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
Expand All @@ -50,7 +51,7 @@ public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper,
MetaStore store, ManagedLedgerConfig config,
OrderedScheduler scheduledExecutor,
String name, final Supplier<Boolean> mlOwnershipChecker) {
String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
this.sourceMLName = config.getShadowSourceName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3389,7 +3389,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
}, checkOwnershipFlag ? () -> true : null, null);
}, checkOwnershipFlag ? () -> CompletableFuture.completedFuture(true) : null, null);
latch.await();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
topicFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, () -> isTopicNsOwnedByBroker(topicName), null);
}, () -> isTopicNsOwnedByBrokerAsync(topicName), null);

}).exceptionally((exception) -> {
log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
Expand Down Expand Up @@ -2136,13 +2136,16 @@ public void monitorBacklogQuota() {
});
}

public boolean isTopicNsOwnedByBroker(TopicName topicName) {
try {
return pulsar.getNamespaceService().isServiceUnitOwned(topicName);
} catch (Exception e) {
log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage());
}
return false;
public CompletableFuture<Boolean> isTopicNsOwnedByBrokerAsync(TopicName topicName) {
return pulsar.getNamespaceService().isServiceUnitOwnedAsync(topicName)
.handle((hasOwnership, t) -> {
if (t == null) {
return hasOwnership;
} else {
log.warn("Failed to check the ownership of the topic: {}, {}", topicName, t.getMessage());
return false;
}
});
}

public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
, originPersistentTopic.getName(), subscription.getName(), exception);
pendingAckStoreFuture.completeExceptionally(exception);
}
}, () -> true, null);
}, () -> CompletableFuture.completedFuture(true), null);
}).exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep

//load the nameserver, but topic is not init.
log.info("lookup:{}",admin.lookups().lookupTopic(topic));
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBrokerAsync(topicName).join());
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testCreateTopicWithNotTopicNsOwnedBroker() {
int verifiedBrokerNum = 0;
for (PulsarService pulsarService : this.getPulsarServiceList()) {
BrokerService bs = pulsarService.getBrokerService();
if (bs.isTopicNsOwnedByBroker(TopicName.get(topicName))) {
if (bs.isTopicNsOwnedByBrokerAsync(TopicName.get(topicName)).join()) {
continue;
}
verifiedBrokerNum ++;
Expand Down

0 comments on commit 68bdfa9

Please sign in to comment.