diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java index 15b68d61..81e58966 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java @@ -88,6 +88,7 @@ public class DLedgerConfig { private boolean isEnableBatchPush = false; private int maxBatchPushSize = 4 * 1024; + private long leadershipTransferWaitTimeout = 1000; public String getDefaultPath() { return storeBaseDir + File.separator + "dledger-" + selfId; @@ -398,4 +399,12 @@ public int getMaxBatchPushSize() { public void setMaxBatchPushSize(int maxBatchPushSize) { this.maxBatchPushSize = maxBatchPushSize; } + + public long getLeadershipTransferWaitTimeout() { + return leadershipTransferWaitTimeout; + } + + public void setLeadershipTransferWaitTimeout(long leadershipTransferWaitTimeout) { + this.leadershipTransferWaitTimeout = leadershipTransferWaitTimeout; + } } diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index 24e16514..e3037ad1 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -297,6 +297,25 @@ public CompletableFuture handleLeadershipTransfer(Le // It's the transferee received the take leadership command. PreConditions.check(request.getTransferId().equals(memberState.getLeaderId()), DLedgerResponseCode.INCONSISTENT_LEADER, "transfer=%s is not leader", request.getTransferId()); + long costTime = 0L; + long startTime = System.currentTimeMillis(); + long fallBehind = request.getTakeLeadershipLedgerIndex() - memberState.getLedgerEndIndex(); + + while (fallBehind > 0) { + + if (costTime > dLedgerConfig.getLeadershipTransferWaitTimeout()) { + throw new DLedgerException(DLedgerResponseCode.TAKE_LEADERSHIP_FAILED, + "transferee fall behind, wait timeout. timeout = {}, diff = {}", + dLedgerConfig.getLeadershipTransferWaitTimeout(), fallBehind); + } + + logger.warn("transferee fall behind, diff = {}", fallBehind); + Thread.sleep(10); + + fallBehind = request.getTakeLeadershipLedgerIndex() - memberState.getLedgerEndIndex(); + costTime = System.currentTimeMillis() - startTime; + } + return dLedgerLeaderElector.handleTakeLeadership(request); } else { return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));