Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#6342] Fix: Local SyncStatSet sync to remote value when changeToMaster #6352

Merged
merged 5 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,26 +227,25 @@ public void shutdown() {
}

public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress, final Integer newMasterEpoch,
final Integer syncStateSetEpoch) {
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch);
changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet);
} else {
changeToSlave(newMasterAddress, newMasterEpoch, newMasterBrokerId);
}
}
}

public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) {
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch);

this.masterEpoch = newMasterEpoch;

// Change SyncStateSet
final HashSet<Long> newSyncStateSet = new HashSet<>();
newSyncStateSet.add(this.brokerControllerId);
final HashSet<Long> newSyncStateSet = new HashSet<>(syncStateSet);
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);

// Change record
Expand Down Expand Up @@ -365,8 +364,10 @@ private void handleSlaveSynchronize(final BrokerRole role) {
private boolean brokerElect() {
// Broker try to elect itself as a master in broker set.
try {
ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.getBrokerName(), this.brokerControllerId);
ElectMasterResponseHeader tryElectResponse = tryElectResponsePair.getObject1();
Set<Long> syncStateSet = tryElectResponsePair.getObject2();
final String masterAddress = tryElectResponse.getMasterAddress();
final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
Expand All @@ -375,7 +376,7 @@ private boolean brokerElect() {
}

if (masterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch(), syncStateSet);
} else {
changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
}
Expand Down Expand Up @@ -544,15 +545,17 @@ private boolean createMetadataFileAndDeleteTemp() {
*/
private boolean registerBrokerToController() {
try {
RegisterBrokerToControllerResponseHeader response = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
if (response == null) return false;
Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> responsePair = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
if (responsePair == null) return false;
RegisterBrokerToControllerResponseHeader response = responsePair.getObject1();
Set<Long> syncStateSet = responsePair.getObject2();
final Long masterBrokerId = response.getMasterBrokerId();
final String masterAddress = response.getMasterAddress();
if (masterBrokerId == null) {
return true;
}
if (this.brokerControllerId.equals(masterBrokerId)) {
changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch());
changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch(), syncStateSet);
} else {
changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId);
}
Expand Down Expand Up @@ -635,7 +638,7 @@ private void schedulingSyncBrokerMetadata() {
if (StringUtils.isNoneEmpty(newMasterAddress) && masterBrokerId != null) {
if (masterBrokerId.equals(this.brokerControllerId)) {
// If this broker is now the master
changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch(), syncStateSet.getSyncStateSet());
} else {
// If this broker is now the slave, and master has been changed
changeToSlave(newMasterAddress, newMasterEpoch, masterBrokerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody;
import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
Expand Down Expand Up @@ -1172,7 +1173,7 @@ public SyncStateSet alterSyncStateSet(
/**
* Broker try to elect itself as a master in broker set
*/
public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName,
public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerAddress, String clusterName, String brokerName,
Long brokerId) throws Exception {

final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
Expand All @@ -1184,12 +1185,15 @@ public ElectMasterResponseHeader brokerElect(String controllerAddress, String cl
throw new MQBrokerException(response.getCode(), "Controller leader was changed");
}
case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
throw new MQBrokerException(response.getCode(), response.getRemark());
case CONTROLLER_ELECT_MASTER_FAILED:
throw new MQBrokerException(response.getCode(), response.getRemark());
case CONTROLLER_MASTER_STILL_EXIST:
case SUCCESS:
return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
final ElectMasterResponseBody responseBody = RemotingSerializable.decode(response.getBody(), ElectMasterResponseBody.class);
return new Pair<>(responseHeader, responseBody.getSyncStateSet());
}

throw new MQBrokerException(response.getCode(), response.getRemark());
}

Expand All @@ -1215,13 +1219,15 @@ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public RegisterBrokerToControllerResponseHeader registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
if (response.getCode() == SUCCESS) {
return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
Set<Long> syncStateSet = RemotingSerializable.decode(response.getBody(), SyncStateSet.class).getSyncStateSet();
return new Pair<>(responseHeader, syncStateSet);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
Expand Down Expand Up @@ -2628,14 +2629,15 @@ private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,
private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
NotifyBrokerRoleChangedRequestHeader requestHeader = (NotifyBrokerRoleChangedRequestHeader) request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
SyncStateSet syncStateSetInfo = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);

RemotingCommand response = RemotingCommand.createResponseCommand(null);

LOGGER.info("Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", requestHeader);

final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
if (replicasManager != null) {
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch());
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down
Loading