From 7f0f92324d2d120bda92f456366e3ed17d90111a Mon Sep 17 00:00:00 2001 From: juntao Date: Wed, 15 Mar 2023 12:38:22 +0800 Subject: [PATCH 1/5] fix ISSUE#6342 --- .../broker/controller/ReplicasManager.java | 15 +++++++-------- .../processor/AdminBrokerProcessor.java | 2 +- .../ReplicasManagerRegisterTest.java | 19 ++++++++++++------- .../controller/ReplicasManagerTest.java | 19 ++++++++++++++++--- .../controller/ControllerManager.java | 2 +- .../impl/manager/ReplicasInfoManager.java | 6 ++++++ .../protocol/body/RoleChangeNotifyEntry.java | 13 +++++++++++-- .../NotifyBrokerRoleChangedRequestHeader.java | 14 +++++++++++++- .../controller/ElectMasterResponseHeader.java | 14 +++++++++++++- ...isterBrokerToControllerResponseHeader.java | 12 ++++++++++++ .../ha/autoswitch/AutoSwitchHAService.java | 10 ++++++++++ 11 files changed, 102 insertions(+), 24 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 068187e4013..676488fd215 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -227,17 +227,17 @@ public void shutdown() { } public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress, final Integer newMasterEpoch, - final Integer syncStateSetEpoch) { + final Integer syncStateSetEpoch, final Set syncStateSet) { if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) { if (newMasterBrokerId.equals(this.brokerControllerId)) { - changeToMaster(newMasterEpoch, syncStateSetEpoch); + changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet); } else { changeToSlave(newMasterAddress, newMasterEpoch, newMasterBrokerId); } } } - public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) { + public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set syncStateSet) { synchronized (this) { if (newMasterEpoch > this.masterEpoch) { LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch); @@ -245,8 +245,7 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch this.masterEpoch = newMasterEpoch; // Change SyncStateSet - final HashSet newSyncStateSet = new HashSet<>(); - newSyncStateSet.add(this.brokerControllerId); + final HashSet newSyncStateSet = new HashSet<>(syncStateSet); changeSyncStateSet(newSyncStateSet, syncStateSetEpoch); // Change record @@ -375,7 +374,7 @@ private boolean brokerElect() { } if (masterBrokerId.equals(this.brokerControllerId)) { - changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch()); + changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch(), tryElectResponse.getSyncStateSet()); } else { changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId()); } @@ -552,7 +551,7 @@ private boolean registerBrokerToController() { return true; } if (this.brokerControllerId.equals(masterBrokerId)) { - changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch()); + changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch(), response.getSyncStateSet()); } else { changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId); } @@ -635,7 +634,7 @@ private void schedulingSyncBrokerMetadata() { if (StringUtils.isNoneEmpty(newMasterAddress) && masterBrokerId != null) { if (masterBrokerId.equals(this.brokerControllerId)) { // If this broker is now the master - changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch()); + changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch(), syncStateSet.getSyncStateSet()); } else { // If this broker is now the slave, and master has been changed changeToSlave(newMasterAddress, newMasterEpoch, masterBrokerId); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f3e446a652e..d53c969f20d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2635,7 +2635,7 @@ private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx, final ReplicasManager replicasManager = this.brokerController.getReplicasManager(); if (replicasManager != null) { - replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch()); + replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), requestHeader.getSyncStateSet()); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java index f0e82b6388e..7c4276390bc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java @@ -45,6 +45,9 @@ import java.io.File; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.UUID; import static org.awaitility.Awaitility.await; @@ -70,6 +73,7 @@ public class ReplicasManagerRegisterTest { public static final String CONTROLLER_ADDR = "127.0.0.1:8888"; public static final BrokerConfig BROKER_CONFIG; + private final HashSet syncStateSet = new HashSet<>(Arrays.asList(1L)); static { BROKER_CONFIG = new BrokerConfig(); @@ -122,10 +126,11 @@ public void setUp() throws Exception { @Test public void testBrokerRegisterSuccess() throws Exception { + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); replicasManager0.start(); @@ -145,7 +150,7 @@ public void testBrokerRegisterSuccessAndRestartWithChangedBrokerConfig() throws when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); replicasManager0.start(); @@ -197,7 +202,7 @@ public void testRegisterFailedAtCreateTempFile() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); PowerMockito.doReturn(false).when(spyReplicasManager, "createTempMetadataFile", anyLong()); @@ -217,7 +222,7 @@ public void testRegisterFailedAtApplyBrokerIdFailed() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); replicasManager.start(); @@ -237,7 +242,7 @@ public void testRegisterFailedAtCreateMetadataFileAndDeleteTemp() throws Excepti when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); PowerMockito.doReturn(false).when(spyReplicasManager, "createMetadataFileAndDeleteTemp"); @@ -280,7 +285,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); replicasManager.start(); @@ -299,7 +304,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { replicasManager.shutdown(); Mockito.reset(mockedBrokerOuterAPI); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn( new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR)); when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index a5cf63d371c..54937e56c41 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.broker.controller; import java.io.File; +import java.util.Arrays; +import java.util.HashSet; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -116,6 +118,10 @@ public class ReplicasManagerTest { private static final Long SYNC_STATE = 1L; + private static final HashSet SYNC_STATE_SET_1 = new HashSet(Arrays.asList(BROKER_ID_1)); + + private static final HashSet SYNC_STATE_SET_2 = new HashSet(Arrays.asList(BROKER_ID_2)); + @Before public void before() throws Exception { UtilAll.deleteFile(new File(STORE_BASE_PATH)); @@ -134,6 +140,7 @@ public void before() throws Exception { brokerTryElectResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS); brokerTryElectResponseHeader.setMasterEpoch(OLD_MASTER_EPOCH); brokerTryElectResponseHeader.setSyncStateSetEpoch(OLD_MASTER_EPOCH); + brokerTryElectResponseHeader.setSyncStateSet(SYNC_STATE_SET_1); getReplicaInfoResponseHeader = new GetReplicaInfoResponseHeader(); getReplicaInfoResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS); getReplicaInfoResponseHeader.setMasterBrokerId(BROKER_ID_1); @@ -173,18 +180,24 @@ public void after() { @Test public void changeBrokerRoleTest() { + HashSet syncStateSetA = new HashSet<>(); + syncStateSetA.add(BROKER_ID_1); + HashSet syncStateSetB = new HashSet<>(); + syncStateSetA.add(BROKER_ID_2); // not equal to localAddress - Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)) + Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetB)) .doesNotThrowAnyException(); // equal to localAddress - Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)) + Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetA)) .doesNotThrowAnyException(); } @Test public void changeToMasterTest() { - Assertions.assertThatCode(() -> replicasManager.changeToMaster(NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)).doesNotThrowAnyException(); + HashSet syncStateSet = new HashSet<>(); + syncStateSet.add(BROKER_ID_1); + Assertions.assertThatCode(() -> replicasManager.changeToMaster(NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSet)).doesNotThrowAnyException(); } @Test diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index 18e9992d31c..d79f86e7649 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -177,7 +177,7 @@ public void doNotifyBrokerRoleChanged(final String brokerAddr, final RoleChangeN if (StringUtils.isNoneEmpty(brokerAddr)) { log.info("Try notify broker {} that role changed, RoleChangeNotifyEntry:{}", brokerAddr, entry); final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), entry.getMasterBrokerId(), - entry.getMasterEpoch(), entry.getSyncStateSetEpoch()); + entry.getMasterEpoch(), entry.getSyncStateSetEpoch(), entry.getSyncStateSet()); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader); try { this.remotingClient.invokeOneway(brokerAddr, request, 3000); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 728cf87e211..2c23fc25acd 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -201,6 +201,7 @@ public ControllerResult electMaster(final ElectMaster response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); response.setMasterBrokerId(oldMaster); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(oldMaster)); + response.setSyncStateSet(syncStateSet); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } @@ -209,10 +210,14 @@ public ControllerResult electMaster(final ElectMaster if (newMaster != null) { final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); + final HashSet newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(newMaster); + response.setMasterBrokerId(newMaster); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(newMaster)); response.setMasterEpoch(masterEpoch + 1); response.setSyncStateSetEpoch(syncStateSetEpoch + 1); + response.setSyncStateSet(newSyncStateSet); BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName); if (null != brokerMemberGroup) { result.setBody(brokerMemberGroup.encode()); @@ -313,6 +318,7 @@ public ControllerResult registerBroker response.setMasterBrokerId(syncStateInfo.getMasterBrokerId()); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(response.getMasterBrokerId())); response.setMasterEpoch(syncStateInfo.getMasterEpoch()); + response.setSyncStateSet(syncStateInfo.getSyncStateSet()); response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); } // if this broker's address has been changed, we need to update it diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java index 4f3f31218f3..60242de2fe6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java @@ -22,6 +22,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import java.util.Set; + public class RoleChangeNotifyEntry { private final BrokerMemberGroup brokerMemberGroup; @@ -34,12 +36,15 @@ public class RoleChangeNotifyEntry { private final int syncStateSetEpoch; - public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch) { + private final Set syncStateSet; + + public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch, Set syncStateSet) { this.brokerMemberGroup = brokerMemberGroup; this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; this.masterBrokerId = masterBrokerId; + this.syncStateSet = syncStateSet; } public static RoleChangeNotifyEntry convert(RemotingCommand electMasterResponse) { @@ -48,7 +53,7 @@ public static RoleChangeNotifyEntry convert(RemotingCommand electMasterResponse) if (electMasterResponse.getBody() != null && electMasterResponse.getBody().length > 0) { brokerMemberGroup = RemotingSerializable.decode(electMasterResponse.getBody(), BrokerMemberGroup.class); } - return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch()); + return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch(), header.getSyncStateSet()); } @@ -71,4 +76,8 @@ public int getSyncStateSetEpoch() { public Long getMasterBrokerId() { return masterBrokerId; } + + public Set getSyncStateSet() { + return syncStateSet; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java index 3a112a57824..5b998a929de 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java @@ -19,20 +19,24 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import java.util.Set; + public class NotifyBrokerRoleChangedRequestHeader implements CommandCustomHeader { private String masterAddress; private Integer masterEpoch; private Integer syncStateSetEpoch; private Long masterBrokerId; + private Set syncStateSet; public NotifyBrokerRoleChangedRequestHeader() { } - public NotifyBrokerRoleChangedRequestHeader(String masterAddress, Long masterBrokerId, Integer masterEpoch, Integer syncStateSetEpoch) { + public NotifyBrokerRoleChangedRequestHeader(String masterAddress, Long masterBrokerId, Integer masterEpoch, Integer syncStateSetEpoch, Set syncStateSet) { this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; this.masterBrokerId = masterBrokerId; + this.syncStateSet = syncStateSet; } public String getMasterAddress() { @@ -67,6 +71,14 @@ public void setMasterBrokerId(Long masterBrokerId) { this.masterBrokerId = masterBrokerId; } + public Set getSyncStateSet() { + return syncStateSet; + } + + public void setSyncStateSet(Set syncStateSet) { + this.syncStateSet = syncStateSet; + } + @Override public String toString() { return "NotifyBrokerRoleChangedRequestHeader{" + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java index d3c8975383f..b9f2afb7a53 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java @@ -19,21 +19,25 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import java.util.Set; + public class ElectMasterResponseHeader implements CommandCustomHeader { private Long masterBrokerId; private String masterAddress; private Integer masterEpoch; private Integer syncStateSetEpoch; + private Set syncStateSet; public ElectMasterResponseHeader() { } - public ElectMasterResponseHeader(Long masterBrokerId, String masterAddress, Integer masterEpoch, Integer syncStateSetEpoch) { + public ElectMasterResponseHeader(Long masterBrokerId, String masterAddress, Integer masterEpoch, Integer syncStateSetEpoch, Set syncStateSet) { this.masterBrokerId = masterBrokerId; this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; + this.syncStateSet = syncStateSet; } public String getMasterAddress() { @@ -68,6 +72,14 @@ public Long getMasterBrokerId() { return masterBrokerId; } + public void setSyncStateSet(Set syncStateSet) { + this.syncStateSet = syncStateSet; + } + + public Set getSyncStateSet() { + return this.syncStateSet; + } + @Override public String toString() { return "ElectMasterResponseHeader{" + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java index 66bf0e44151..f97fdeb48dc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java @@ -20,6 +20,8 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import java.util.Set; + public class RegisterBrokerToControllerResponseHeader implements CommandCustomHeader { private String clusterName; @@ -34,6 +36,8 @@ public class RegisterBrokerToControllerResponseHeader implements CommandCustomHe private Integer syncStateSetEpoch; + private Set syncStateSet; + @Override public void checkFields() throws RemotingCommandException { @@ -94,4 +98,12 @@ public void setClusterName(String clusterName) { public void setBrokerName(String brokerName) { this.brokerName = brokerName; } + + public Set getSyncStateSet() { + return syncStateSet; + } + + public void setSyncStateSet(Set syncStateSet) { + this.syncStateSet = syncStateSet; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index 42586145521..683018b67ca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -246,6 +246,16 @@ public Set maybeShrinkSyncStateSet() { } } } + + // If the slaveBrokerId is in syncStateSet but not in connectionCaughtUpTimeTable, + // it means that the broker has not connected. + for (Long slaveBrokerId : newSyncStateSet) { + if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) { + newSyncStateSet.remove(slaveBrokerId); + isSyncStateSetChanged = true; + } + } + if (isSyncStateSetChanged) { markSynchronizingSyncStateSet(newSyncStateSet); } From 159134723d080e5c10833924f2dde0dc6933c9e1 Mon Sep 17 00:00:00 2001 From: juntao Date: Wed, 15 Mar 2023 13:15:37 +0800 Subject: [PATCH 2/5] refactor --- .../rocketmq/broker/controller/ReplicasManagerRegisterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java index 7c4276390bc..56225b2fe3d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java @@ -45,7 +45,6 @@ import java.io.File; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.UUID; From 7e193f8d71a55663505b9de9f292c8c8bcaf54e3 Mon Sep 17 00:00:00 2001 From: juntao Date: Wed, 15 Mar 2023 19:15:12 +0800 Subject: [PATCH 3/5] correct the syncStateSet in elect process. --- .../broker/controller/ReplicasManager.java | 6 +- .../rocketmq/broker/out/BrokerOuterAPI.java | 8 +- .../processor/AdminBrokerProcessor.java | 4 +- .../ReplicasManagerRegisterTest.java | 15 ++-- .../controller/ReplicasManagerTest.java | 3 +- .../controller/ControllerManager.java | 4 +- .../impl/manager/ReplicasInfoManager.java | 11 ++- .../body/ElectMasterResponseBody.java | 90 +++++++++++++++++++ .../protocol/body/RoleChangeNotifyEntry.java | 9 +- .../NotifyBrokerRoleChangedRequestHeader.java | 12 +-- .../controller/ElectMasterResponseHeader.java | 13 +-- 11 files changed, 132 insertions(+), 43 deletions(-) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 676488fd215..54a3d74b6ab 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -364,8 +364,10 @@ private void handleSlaveSynchronize(final BrokerRole role) { private boolean brokerElect() { // Broker try to elect itself as a master in broker set. try { - ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), + Pair> tryElectResponsePair = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerControllerId); + ElectMasterResponseHeader tryElectResponse = tryElectResponsePair.getObject1(); + Set syncStateSet = tryElectResponsePair.getObject2(); final String masterAddress = tryElectResponse.getMasterAddress(); final Long masterBrokerId = tryElectResponse.getMasterBrokerId(); if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) { @@ -374,7 +376,7 @@ private boolean brokerElect() { } if (masterBrokerId.equals(this.brokerControllerId)) { - changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch(), tryElectResponse.getSyncStateSet()); + changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch(), syncStateSet); } else { changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 4f298e1aaec..bff89ad8237 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -80,6 +80,7 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody; import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody; import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; @@ -1172,7 +1173,7 @@ public SyncStateSet alterSyncStateSet( /** * Broker try to elect itself as a master in broker set */ - public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName, + public Pair> brokerElect(String controllerAddress, String clusterName, String brokerName, Long brokerId) throws Exception { final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId); @@ -1188,8 +1189,11 @@ public ElectMasterResponseHeader brokerElect(String controllerAddress, String cl case CONTROLLER_ELECT_MASTER_FAILED: case CONTROLLER_MASTER_STILL_EXIST: case SUCCESS: - return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class); + final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class); + final ElectMasterResponseBody responseBody = RemotingSerializable.decode(response.getBody(), ElectMasterResponseBody.class); + return new Pair<>(responseHeader, responseBody.getSyncStateSet()); } + throw new MQBrokerException(response.getCode(), response.getRemark()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index d53c969f20d..53729a43bca 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -111,6 +111,7 @@ import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody; import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; @@ -2628,6 +2629,7 @@ private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx, private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { NotifyBrokerRoleChangedRequestHeader requestHeader = (NotifyBrokerRoleChangedRequestHeader) request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class); + SyncStateSet syncStateSetInfo = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -2635,7 +2637,7 @@ private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx, final ReplicasManager replicasManager = this.brokerController.getReplicasManager(); if (replicasManager != null) { - replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), requestHeader.getSyncStateSet()); + replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet()); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java index 56225b2fe3d..4eed8a8a59f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java @@ -22,6 +22,7 @@ import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; @@ -129,7 +130,7 @@ public void testBrokerRegisterSuccess() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); replicasManager0.start(); @@ -149,7 +150,7 @@ public void testBrokerRegisterSuccessAndRestartWithChangedBrokerConfig() throws when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); replicasManager0.start(); @@ -201,7 +202,7 @@ public void testRegisterFailedAtCreateTempFile() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); PowerMockito.doReturn(false).when(spyReplicasManager, "createTempMetadataFile", anyLong()); @@ -221,7 +222,7 @@ public void testRegisterFailedAtApplyBrokerIdFailed() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); replicasManager.start(); @@ -241,7 +242,7 @@ public void testRegisterFailedAtCreateMetadataFileAndDeleteTemp() throws Excepti when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); PowerMockito.doReturn(false).when(spyReplicasManager, "createMetadataFileAndDeleteTemp"); @@ -284,7 +285,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); replicasManager.start(); @@ -303,7 +304,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { replicasManager.shutdown(); Mockito.reset(mockedBrokerOuterAPI); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1, syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn( new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR)); when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index 54937e56c41..da1691444cd 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@ -140,7 +140,6 @@ public void before() throws Exception { brokerTryElectResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS); brokerTryElectResponseHeader.setMasterEpoch(OLD_MASTER_EPOCH); brokerTryElectResponseHeader.setSyncStateSetEpoch(OLD_MASTER_EPOCH); - brokerTryElectResponseHeader.setSyncStateSet(SYNC_STATE_SET_1); getReplicaInfoResponseHeader = new GetReplicaInfoResponseHeader(); getReplicaInfoResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS); getReplicaInfoResponseHeader.setMasterBrokerId(BROKER_ID_1); @@ -163,7 +162,7 @@ public void before() throws Exception { when(brokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(applyBrokerIdResponseHeader); when(brokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(registerBrokerToControllerResponseHeader); when(brokerOuterAPI.getReplicaInfo(any(), any())).thenReturn(result); - when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(brokerTryElectResponseHeader); + when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(new Pair<>(brokerTryElectResponseHeader, SYNC_STATE_SET_1)); replicasManager = new ReplicasManager(brokerController); autoSwitchHAService.init(defaultMessageStore); replicasManager.start(); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index d79f86e7649..3da861c99a7 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry; +import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; @@ -177,8 +178,9 @@ public void doNotifyBrokerRoleChanged(final String brokerAddr, final RoleChangeN if (StringUtils.isNoneEmpty(brokerAddr)) { log.info("Try notify broker {} that role changed, RoleChangeNotifyEntry:{}", brokerAddr, entry); final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), entry.getMasterBrokerId(), - entry.getMasterEpoch(), entry.getSyncStateSetEpoch(), entry.getSyncStateSet()); + entry.getMasterEpoch(), entry.getSyncStateSetEpoch()); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader); + request.setBody(new SyncStateSet(entry.getSyncStateSet(), entry.getSyncStateSetEpoch()).encode()); try { this.remotingClient.invokeOneway(brokerAddr, request, 3000); } catch (final Exception e) { diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 2c23fc25acd..b85b2f30914 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -44,6 +44,7 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; +import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody; import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader; @@ -201,7 +202,8 @@ public ControllerResult electMaster(final ElectMaster response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); response.setMasterBrokerId(oldMaster); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(oldMaster)); - response.setSyncStateSet(syncStateSet); + + result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } @@ -217,11 +219,14 @@ public ControllerResult electMaster(final ElectMaster response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(newMaster)); response.setMasterEpoch(masterEpoch + 1); response.setSyncStateSetEpoch(syncStateSetEpoch + 1); - response.setSyncStateSet(newSyncStateSet); + ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); + BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName); if (null != brokerMemberGroup) { - result.setBody(brokerMemberGroup.encode()); + responseBody.setBrokerMemberGroup(brokerMemberGroup); } + + result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java new file mode 100644 index 00000000000..7e15894b776 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.body; + +import com.google.common.base.Objects; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class ElectMasterResponseBody extends RemotingSerializable { + private BrokerMemberGroup brokerMemberGroup; + private Set syncStateSet; + + // Provide default constructor for serializer + public ElectMasterResponseBody() { + this.syncStateSet = new HashSet(); + this.brokerMemberGroup = null; + } + + public ElectMasterResponseBody(final Set syncStateSet) { + this.syncStateSet = syncStateSet; + this.brokerMemberGroup = null; + } + + public ElectMasterResponseBody(final BrokerMemberGroup brokerMemberGroup, final Set syncStateSet) { + this.brokerMemberGroup = brokerMemberGroup; + this.syncStateSet = syncStateSet; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ElectMasterResponseBody that = (ElectMasterResponseBody) o; + return Objects.equal(brokerMemberGroup, that.brokerMemberGroup) && + Objects.equal(syncStateSet, that.syncStateSet); + } + + @Override + public int hashCode() { + return Objects.hashCode(brokerMemberGroup, syncStateSet); + } + + @Override + public String toString() { + return "BrokerMemberGroup{" + + "brokerMemberGroup='" + brokerMemberGroup.toString() + '\'' + + ", syncStateSet='" + syncStateSet.toString() + + '}'; + } + + public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) { + this.brokerMemberGroup = brokerMemberGroup; + } + + public BrokerMemberGroup getBrokerMemberGroup() { + return brokerMemberGroup; + } + + public void setSyncStateSet(Set syncStateSet) { + this.syncStateSet = syncStateSet; + } + + public Set getSyncStateSet() { + return syncStateSet; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java index 60242de2fe6..ab25df0d1d0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java @@ -50,10 +50,15 @@ public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterA public static RoleChangeNotifyEntry convert(RemotingCommand electMasterResponse) { final ElectMasterResponseHeader header = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader(); BrokerMemberGroup brokerMemberGroup = null; + Set syncStateSet = null; + if (electMasterResponse.getBody() != null && electMasterResponse.getBody().length > 0) { - brokerMemberGroup = RemotingSerializable.decode(electMasterResponse.getBody(), BrokerMemberGroup.class); + ElectMasterResponseBody body = RemotingSerializable.decode(electMasterResponse.getBody(), ElectMasterResponseBody.class); + brokerMemberGroup = body.getBrokerMemberGroup(); + syncStateSet = body.getSyncStateSet(); } - return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch(), header.getSyncStateSet()); + + return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch(), syncStateSet); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java index 5b998a929de..2405507ca7c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java @@ -26,17 +26,15 @@ public class NotifyBrokerRoleChangedRequestHeader implements CommandCustomHeader private Integer masterEpoch; private Integer syncStateSetEpoch; private Long masterBrokerId; - private Set syncStateSet; public NotifyBrokerRoleChangedRequestHeader() { } - public NotifyBrokerRoleChangedRequestHeader(String masterAddress, Long masterBrokerId, Integer masterEpoch, Integer syncStateSetEpoch, Set syncStateSet) { + public NotifyBrokerRoleChangedRequestHeader(String masterAddress, Long masterBrokerId, Integer masterEpoch, Integer syncStateSetEpoch) { this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; this.masterBrokerId = masterBrokerId; - this.syncStateSet = syncStateSet; } public String getMasterAddress() { @@ -71,14 +69,6 @@ public void setMasterBrokerId(Long masterBrokerId) { this.masterBrokerId = masterBrokerId; } - public Set getSyncStateSet() { - return syncStateSet; - } - - public void setSyncStateSet(Set syncStateSet) { - this.syncStateSet = syncStateSet; - } - @Override public String toString() { return "NotifyBrokerRoleChangedRequestHeader{" + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java index b9f2afb7a53..aaf3b10b829 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java @@ -19,7 +19,6 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import java.util.Set; public class ElectMasterResponseHeader implements CommandCustomHeader { @@ -27,17 +26,15 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { private String masterAddress; private Integer masterEpoch; private Integer syncStateSetEpoch; - private Set syncStateSet; public ElectMasterResponseHeader() { } - public ElectMasterResponseHeader(Long masterBrokerId, String masterAddress, Integer masterEpoch, Integer syncStateSetEpoch, Set syncStateSet) { + public ElectMasterResponseHeader(Long masterBrokerId, String masterAddress, Integer masterEpoch, Integer syncStateSetEpoch) { this.masterBrokerId = masterBrokerId; this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; - this.syncStateSet = syncStateSet; } public String getMasterAddress() { @@ -72,14 +69,6 @@ public Long getMasterBrokerId() { return masterBrokerId; } - public void setSyncStateSet(Set syncStateSet) { - this.syncStateSet = syncStateSet; - } - - public Set getSyncStateSet() { - return this.syncStateSet; - } - @Override public String toString() { return "ElectMasterResponseHeader{" + From ec7acfb8da9aae739b83018d4d0483c158ea427e Mon Sep 17 00:00:00 2001 From: juntao Date: Wed, 15 Mar 2023 19:46:21 +0800 Subject: [PATCH 4/5] move syncStateSet into request/response's body. --- .../rocketmq/broker/controller/ReplicasManager.java | 8 +++++--- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 6 ++++-- .../controller/ReplicasManagerRegisterTest.java | 12 ++++++------ .../broker/controller/ReplicasManagerTest.java | 2 +- .../controller/impl/manager/ReplicasInfoManager.java | 2 +- .../protocol/body/ElectMasterResponseBody.java | 4 ---- .../header/NotifyBrokerRoleChangedRequestHeader.java | 2 -- .../RegisterBrokerToControllerResponseHeader.java | 12 ------------ 8 files changed, 17 insertions(+), 31 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 54a3d74b6ab..b638f1bdb9c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -545,15 +545,17 @@ private boolean createMetadataFileAndDeleteTemp() { */ private boolean registerBrokerToController() { try { - RegisterBrokerToControllerResponseHeader response = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress); - if (response == null) return false; + Pair> responsePair = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress); + if (responsePair == null) return false; + RegisterBrokerToControllerResponseHeader response = responsePair.getObject1(); + Set syncStateSet = responsePair.getObject2(); final Long masterBrokerId = response.getMasterBrokerId(); final String masterAddress = response.getMasterAddress(); if (masterBrokerId == null) { return true; } if (this.brokerControllerId.equals(masterBrokerId)) { - changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch(), response.getSyncStateSet()); + changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch(), syncStateSet); } else { changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index bff89ad8237..532d73ac979 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -1219,13 +1219,15 @@ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final throw new MQBrokerException(response.getCode(), response.getRemark()); } - public RegisterBrokerToControllerResponseHeader registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception { + public Pair> registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception { final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); assert response != null; if (response.getCode() == SUCCESS) { - return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class); + RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class); + Set syncStateSet = RemotingSerializable.decode(response.getBody(), SyncStateSet.class).getSyncStateSet(); + return new Pair<>(responseHeader, syncStateSet); } throw new MQBrokerException(response.getCode(), response.getRemark()); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java index 4eed8a8a59f..7fb9d9aeb59 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java @@ -129,7 +129,7 @@ public void testBrokerRegisterSuccess() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); @@ -149,7 +149,7 @@ public void testBrokerRegisterSuccess() throws Exception { public void testBrokerRegisterSuccessAndRestartWithChangedBrokerConfig() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); @@ -201,7 +201,7 @@ public void testRegisterFailedAtCreateTempFile() throws Exception { ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); PowerMockito.doReturn(false).when(spyReplicasManager, "createTempMetadataFile", anyLong()); @@ -221,7 +221,7 @@ public void testRegisterFailedAtApplyBrokerIdFailed() throws Exception { ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); replicasManager.start(); @@ -241,7 +241,7 @@ public void testRegisterFailedAtCreateMetadataFileAndDeleteTemp() throws Excepti ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); @@ -315,7 +315,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 2L)); // because apply brokerId: 1 has succeeded, so next request which try to apply brokerId: 1 will be failed when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), eq(1L), any(), any())).thenThrow(new RuntimeException()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); replicasManagerNew.start(); Assert.assertEquals(ReplicasManager.State.RUNNING, replicasManagerNew.getState()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index da1691444cd..e03828cff40 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@ -160,7 +160,7 @@ public void before() throws Exception { when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true); when(brokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(getNextBrokerIdResponseHeader); when(brokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(applyBrokerIdResponseHeader); - when(brokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(registerBrokerToControllerResponseHeader); + when(brokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), SYNC_STATE_SET_1)); when(brokerOuterAPI.getReplicaInfo(any(), any())).thenReturn(result); when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(new Pair<>(brokerTryElectResponseHeader, SYNC_STATE_SET_1)); replicasManager = new ReplicasManager(brokerController); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index b85b2f30914..103cb68e249 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -323,9 +323,9 @@ public ControllerResult registerBroker response.setMasterBrokerId(syncStateInfo.getMasterBrokerId()); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(response.getMasterBrokerId())); response.setMasterEpoch(syncStateInfo.getMasterEpoch()); - response.setSyncStateSet(syncStateInfo.getSyncStateSet()); response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); } + result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode()); // if this broker's address has been changed, we need to update it if (!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) { final UpdateBrokerAddressEvent event = new UpdateBrokerAddressEvent(clusterName, brokerName, brokerAddress, brokerId); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java index 7e15894b776..8aef636fa4c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java @@ -19,11 +19,7 @@ import com.google.common.base.Objects; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; public class ElectMasterResponseBody extends RemotingSerializable { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java index 2405507ca7c..3a112a57824 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java @@ -19,8 +19,6 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import java.util.Set; - public class NotifyBrokerRoleChangedRequestHeader implements CommandCustomHeader { private String masterAddress; private Integer masterEpoch; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java index f97fdeb48dc..66bf0e44151 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterBrokerToControllerResponseHeader.java @@ -20,8 +20,6 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import java.util.Set; - public class RegisterBrokerToControllerResponseHeader implements CommandCustomHeader { private String clusterName; @@ -36,8 +34,6 @@ public class RegisterBrokerToControllerResponseHeader implements CommandCustomHe private Integer syncStateSetEpoch; - private Set syncStateSet; - @Override public void checkFields() throws RemotingCommandException { @@ -98,12 +94,4 @@ public void setClusterName(String clusterName) { public void setBrokerName(String brokerName) { this.brokerName = brokerName; } - - public Set getSyncStateSet() { - return syncStateSet; - } - - public void setSyncStateSet(Set syncStateSet) { - this.syncStateSet = syncStateSet; - } } From 77893ef69a444733d0d7483dd648d2062be505ab Mon Sep 17 00:00:00 2001 From: juntao Date: Thu, 16 Mar 2023 10:34:13 +0800 Subject: [PATCH 5/5] optimize the broker electing switch's branch. --- .../java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 532d73ac979..144f05016b9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -1185,8 +1185,8 @@ public Pair> brokerElect(String controllerA throw new MQBrokerException(response.getCode(), "Controller leader was changed"); } case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED: - throw new MQBrokerException(response.getCode(), response.getRemark()); case CONTROLLER_ELECT_MASTER_FAILED: + throw new MQBrokerException(response.getCode(), response.getRemark()); case CONTROLLER_MASTER_STILL_EXIST: case SUCCESS: final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);