Skip to content

Commit

Permalink
[ISSUE#6342] Local SyncStatSet sync to remote value when changeToMast…
Browse files Browse the repository at this point in the history
…er (apache#6352)

* fix ISSUE#6342

* refactor

* correct the syncStateSet in elect process.

* move syncStateSet into request/response's body.

* optimize the broker electing switch's branch.
  • Loading branch information
GenerousMan authored and fuyou001 committed Mar 16, 2023
1 parent 53da8d9 commit bd0ab72
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,26 +227,25 @@ public void shutdown() {
}

public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress, final Integer newMasterEpoch,
final Integer syncStateSetEpoch) {
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch);
changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet);
} else {
changeToSlave(newMasterAddress, newMasterEpoch, newMasterBrokerId);
}
}
}

public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) {
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch);

this.masterEpoch = newMasterEpoch;

// Change SyncStateSet
final HashSet<Long> newSyncStateSet = new HashSet<>();
newSyncStateSet.add(this.brokerControllerId);
final HashSet<Long> newSyncStateSet = new HashSet<>(syncStateSet);
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);

// Change record
Expand Down Expand Up @@ -365,8 +364,10 @@ private void handleSlaveSynchronize(final BrokerRole role) {
private boolean brokerElect() {
// Broker try to elect itself as a master in broker set.
try {
ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.getBrokerName(), this.brokerControllerId);
ElectMasterResponseHeader tryElectResponse = tryElectResponsePair.getObject1();
Set<Long> syncStateSet = tryElectResponsePair.getObject2();
final String masterAddress = tryElectResponse.getMasterAddress();
final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
Expand All @@ -375,7 +376,7 @@ private boolean brokerElect() {
}

if (masterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch(), syncStateSet);
} else {
changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
}
Expand Down Expand Up @@ -544,15 +545,17 @@ private boolean createMetadataFileAndDeleteTemp() {
*/
private boolean registerBrokerToController() {
try {
RegisterBrokerToControllerResponseHeader response = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
if (response == null) return false;
Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> responsePair = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
if (responsePair == null) return false;
RegisterBrokerToControllerResponseHeader response = responsePair.getObject1();
Set<Long> syncStateSet = responsePair.getObject2();
final Long masterBrokerId = response.getMasterBrokerId();
final String masterAddress = response.getMasterAddress();
if (masterBrokerId == null) {
return true;
}
if (this.brokerControllerId.equals(masterBrokerId)) {
changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch());
changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch(), syncStateSet);
} else {
changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId);
}
Expand Down Expand Up @@ -635,7 +638,7 @@ private void schedulingSyncBrokerMetadata() {
if (StringUtils.isNoneEmpty(newMasterAddress) && masterBrokerId != null) {
if (masterBrokerId.equals(this.brokerControllerId)) {
// If this broker is now the master
changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch(), syncStateSet.getSyncStateSet());
} else {
// If this broker is now the slave, and master has been changed
changeToSlave(newMasterAddress, newMasterEpoch, masterBrokerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody;
import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
Expand Down Expand Up @@ -1172,7 +1173,7 @@ public SyncStateSet alterSyncStateSet(
/**
* Broker try to elect itself as a master in broker set
*/
public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName,
public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerAddress, String clusterName, String brokerName,
Long brokerId) throws Exception {

final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
Expand All @@ -1184,12 +1185,15 @@ public ElectMasterResponseHeader brokerElect(String controllerAddress, String cl
throw new MQBrokerException(response.getCode(), "Controller leader was changed");
}
case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
throw new MQBrokerException(response.getCode(), response.getRemark());
case CONTROLLER_ELECT_MASTER_FAILED:
throw new MQBrokerException(response.getCode(), response.getRemark());
case CONTROLLER_MASTER_STILL_EXIST:
case SUCCESS:
return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
final ElectMasterResponseBody responseBody = RemotingSerializable.decode(response.getBody(), ElectMasterResponseBody.class);
return new Pair<>(responseHeader, responseBody.getSyncStateSet());
}

throw new MQBrokerException(response.getCode(), response.getRemark());
}

Expand All @@ -1215,13 +1219,15 @@ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public RegisterBrokerToControllerResponseHeader registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
if (response.getCode() == SUCCESS) {
return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
Set<Long> syncStateSet = RemotingSerializable.decode(response.getBody(), SyncStateSet.class).getSyncStateSet();
return new Pair<>(responseHeader, syncStateSet);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
Expand Down Expand Up @@ -2628,14 +2629,15 @@ private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,
private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
NotifyBrokerRoleChangedRequestHeader requestHeader = (NotifyBrokerRoleChangedRequestHeader) request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
SyncStateSet syncStateSetInfo = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);

RemotingCommand response = RemotingCommand.createResponseCommand(null);

LOGGER.info("Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", requestHeader);

final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
if (replicasManager != null) {
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch());
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down
Loading

0 comments on commit bd0ab72

Please sign in to comment.