Skip to content

Commit

Permalink
suppport multiple preferred leader
Browse files Browse the repository at this point in the history
Signed-off-by: zhangyang21 <[email protected]>
  • Loading branch information
Git-Yang committed Sep 29, 2021
1 parent 9ea565e commit 28309cd
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
16 changes: 13 additions & 3 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class DLedgerConfig {
private boolean enablePushToFollower = true;

@Parameter(names = {"--preferred-leader-id"}, description = "Preferred LeaderId")
private String preferredLeaderId;
private String preferredLeaderIds;
private long maxLeadershipTransferWaitIndex = 1000;
private int minTakeLeadershipVoteIntervalMs = 30;
private int maxTakeLeadershipVoteIntervalMs = 100;
Expand Down Expand Up @@ -341,12 +341,22 @@ public void setCheckPointInterval(long checkPointInterval) {
this.checkPointInterval = checkPointInterval;
}

@Deprecated
public String getPreferredLeaderId() {
return preferredLeaderId;
return preferredLeaderIds;
}

@Deprecated
public void setPreferredLeaderId(String preferredLeaderId) {
this.preferredLeaderId = preferredLeaderId;
this.preferredLeaderIds = preferredLeaderId;
}

public String getPreferredLeaderIds() {
return preferredLeaderIds;
}

public void setPreferredLeaderIds(String preferredLeaderIds) {
this.preferredLeaderIds = preferredLeaderIds;
}

public long getMaxLeadershipTransferWaitIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -372,8 +373,11 @@ private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term,
}

private boolean isTakingLeadership() {
return memberState.getSelfId().equals(dLedgerConfig.getPreferredLeaderId())
|| memberState.getTermToTakeLeadership() == memberState.currTerm();
if (dLedgerConfig.getPreferredLeaderIds() != null && memberState.getTermToTakeLeadership() == memberState.currTerm()) {
List<String> preferredLeaderIds = Arrays.asList(dLedgerConfig.getPreferredLeaderIds().split(";"));
return preferredLeaderIds.contains(memberState.getSelfId());
}
return false;
}

private long getNextTimeToRequestVote() {
Expand Down
44 changes: 35 additions & 9 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
import io.openmessaging.storage.dledger.utils.PreConditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -313,32 +316,55 @@ private void checkPreferredLeader() {
if (!memberState.isLeader()) {
return;
}
String preferredLeaderId = dLedgerConfig.getPreferredLeaderId();
if (preferredLeaderId == null || preferredLeaderId.equals(dLedgerConfig.getSelfId())) {

if (dLedgerConfig.getPreferredLeaderIds() == null) {
return;
}

if (!memberState.isPeerMember(preferredLeaderId)) {
logger.warn("preferredLeaderId = {} is not a peer member", preferredLeaderId);
if (memberState.getTransferee() != null) {
return;
}

if (memberState.getTransferee() != null) {
List<String> preferredLeaderIds = new ArrayList<>(Arrays.asList(dLedgerConfig.getPreferredLeaderIds().split(";")));
if (preferredLeaderIds.contains(dLedgerConfig.getSelfId())) {
return;
}

if (!memberState.getPeersLiveTable().containsKey(preferredLeaderId) ||
memberState.getPeersLiveTable().get(preferredLeaderId) == Boolean.FALSE) {
logger.warn("preferredLeaderId = {} is not online", preferredLeaderId);
Iterator<String> it = preferredLeaderIds.iterator();
while (it.hasNext()) {
String preferredLeaderId = it.next();
if (!memberState.isPeerMember(preferredLeaderId)) {
it.remove();
logger.warn("preferredLeaderId = {} is not a peer member", preferredLeaderId);
continue;
}

if (!memberState.getPeersLiveTable().containsKey(preferredLeaderId) ||
memberState.getPeersLiveTable().get(preferredLeaderId) == Boolean.FALSE.booleanValue()) {
it.remove();
logger.warn("preferredLeaderId = {} is not online", preferredLeaderId);
continue;
}

long fallBehind = dLedgerStore.getLedgerEndIndex() - dLedgerEntryPusher.getPeerWaterMark(memberState.currTerm(), preferredLeaderId);
if (fallBehind >= dLedgerConfig.getMaxLeadershipTransferWaitIndex()) {
logger.warn("preferredLeaderId = {} transferee fall behind index : {}", preferredLeaderId, fallBehind);
continue;
}
}

if (preferredLeaderIds.size() == 0) {
return;
}

String preferredLeaderId = preferredLeaderIds.get(0);

long fallBehind = dLedgerStore.getLedgerEndIndex() - dLedgerEntryPusher.getPeerWaterMark(memberState.currTerm(), preferredLeaderId);
logger.info("transferee fall behind index : {}", fallBehind);
if (fallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex()) {
LeadershipTransferRequest request = new LeadershipTransferRequest();
request.setTerm(memberState.currTerm());
request.setTransfereeId(dLedgerConfig.getPreferredLeaderId());
request.setTransfereeId(preferredLeaderId);

try {
long startTransferTime = System.currentTimeMillis();
Expand Down

0 comments on commit 28309cd

Please sign in to comment.