Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5989] Support unique broker-id as identification in controller mode #6100

Merged
merged 50 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
ec78ddd
[ISSUE#5045] Refactor the register and elect-master process in contro…
TheR1sing3un Feb 1, 2023
accb7df
Resolve the conflict
RongtongJin Feb 1, 2023
1397aa0
feat(controller): refactor broker's information recording core from i…
TheR1sing3un Feb 3, 2023
9103e69
feat(controller): add protocols about new register flow
TheR1sing3un Feb 3, 2023
592f5e2
refactor code in module: store/ha for persistence broker-id
TheR1sing3un Feb 4, 2023
8c6266c
feat(broker): implement the general register to controller protocol
TheR1sing3un Feb 5, 2023
e65caed
feat(controller): implement the general register to controller protoc…
TheR1sing3un Feb 5, 2023
e61033f
feat(controller): implement logic about dealing with UpdateBrokerAddr…
TheR1sing3un Feb 5, 2023
ac2f859
feat(controller): Improved logic and adaptation testing for persisten…
TheR1sing3un Feb 5, 2023
7e6c6de
feat(broker): perfect logic test in broker
TheR1sing3un Feb 5, 2023
08c057e
feat(broker): perfect ReplicaManagerTest.java
TheR1sing3un Feb 5, 2023
70eb782
feat(broker): fix some bugs to successfully compile project
TheR1sing3un Feb 6, 2023
17c082a
fix(controller): fix some bug about wrong type comparison
TheR1sing3un Feb 6, 2023
e2b4b53
fix(controller): fix some bug about ignoring new-add event type
TheR1sing3un Feb 6, 2023
01b699e
fix(controller): fix some bug to pass AutoSwitchRoleIntegrationTest
TheR1sing3un Feb 6, 2023
077cb36
style(controller): remove unused import
TheR1sing3un Feb 6, 2023
4594574
[ISSUE #6023] Add a unit test to verify new register process in broke…
TheR1sing3un Feb 10, 2023
2aa35bc
fix(broker): mark unless attribute with @Deprecated
TheR1sing3un Feb 16, 2023
ea518c8
test(broker): add test to verify broker restart with changed address
TheR1sing3un Feb 16, 2023
cb1f82e
refactor(controller): optimize some code suggested by reviewer
TheR1sing3un Feb 22, 2023
2147432
test(test): add AutoSwitchRoleIntegration#testBasicWorkWhenController…
TheR1sing3un Mar 3, 2023
8470996
fix(controller): return failure when receive a heartbeat with empty b…
TheR1sing3un Mar 5, 2023
79baedf
fix(store): fix wrong value of MessageStoreConfig#storePathMetadata a…
TheR1sing3un Mar 5, 2023
5cffe15
fix(admin): print masterBrokerId when calling GetSyncStateSet
TheR1sing3un Mar 5, 2023
024e497
docs(docs): add docs about how to update to BrokerId version
TheR1sing3un Mar 6, 2023
161100a
refactor(controller): remove meaningless attribute `MessageStoreConfi…
TheR1sing3un Mar 6, 2023
ad4651d
feat(broker): check metadata if valid when register
TheR1sing3un Mar 6, 2023
aa412aa
Fix some typos and log output
RongtongJin Mar 7, 2023
7686461
fix(broker): set isolate's value to false to normally register broker…
TheR1sing3un Mar 7, 2023
bb2ac67
Modify StorePathMetadata to StorePathBrokerIdentity
RongtongJin Mar 9, 2023
9a683b7
feat(broker): Random sleep within one second when broker register failed
TheR1sing3un Mar 9, 2023
bda9915
Make random an object variable
RongtongJin Mar 10, 2023
09d269b
Fix bug that state not match when handshake
RongtongJin Mar 10, 2023
cfa4a63
Polish the code structure and code style
RongtongJin Mar 11, 2023
81c5496
Rename cleanBrokerData subCommand to cleanBrokerMetadata
RongtongJin Mar 11, 2023
8c43329
refactor(broker): rename registerSuccess to registerBrokerToController
TheR1sing3un Mar 11, 2023
2a3544c
test(broker): fix forgetting set the changed cluster name broker conf…
TheR1sing3un Mar 11, 2023
31ca825
feat(broker): add more logs when broker register to controller
TheR1sing3un Mar 11, 2023
08a134f
fix(broker): fix wrong log
TheR1sing3un Mar 11, 2023
d39e519
fix(broker): fix wrong test
TheR1sing3un Mar 11, 2023
616b81a
fix(admin): fix incompatible command: CleanBrokerMeta
TheR1sing3un Mar 11, 2023
1596d9f
fix(controller): fix forgetting initialize BrokerHeartbeatManager
TheR1sing3un Mar 11, 2023
72de390
Fix CleanControllerBrokerMetaSubCommand and ReElectMasterSubCommand p…
RongtongJin Mar 12, 2023
e6619fb
test(controller): add more logs when register and refactor ReplicasMa…
TheR1sing3un Mar 12, 2023
eeba8fd
test(controller): try to fix flaky test
TheR1sing3un Mar 12, 2023
c341487
test(broker): optimize some test base store path
TheR1sing3un Mar 13, 2023
946cf69
fix(store): fix conflicts after rebase
TheR1sing3un Mar 13, 2023
b2c7421
test(broker): fix conflicts in test after rebase
TheR1sing3un Mar 13, 2023
f57d97e
test(controller): To pass `ControllerManagerTest` in Windows, we for…
TheR1sing3un Mar 13, 2023
d77e8bf
Fix some spelling problems
RongtongJin Mar 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ java_library(
"@maven//:org_assertj_assertj_core",
"@maven//:org_hamcrest_hamcrest_library",
"@maven//:org_mockito_mockito_core",
"@maven//:org_powermock_powermock_module_junit4",
"@maven//:org_powermock_powermock_api_mockito2",
"@maven//:org_hamcrest_hamcrest_core",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:org_awaitility_awaitility",
Expand Down
3 changes: 3 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ maven_install(
"io.netty:netty-all:4.1.65.Final",
"org.assertj:assertj-core:3.22.0",
"org.mockito:mockito-core:3.10.0",
"org.powermock:powermock-module-junit4:2.0.9",
"org.powermock:powermock-api-mockito2:2.0.9",

"com.github.luben:zstd-jni:1.5.2-2",
"org.lz4:lz4-java:1.8.0",
"commons-validator:commons-validator:1.7",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
Expand Down Expand Up @@ -1733,24 +1732,7 @@ protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,

protected void sendHeartbeat() {
if (this.brokerConfig.isEnableControllerMode()) {
final List<String> controllerAddresses = this.replicasManager.getAvailableControllerAddresses();
for (String controllerAddress : controllerAddresses) {
if (StringUtils.isNotEmpty(controllerAddress)) {
this.brokerOuterAPI.sendHeartbeatToController(
controllerAddress,
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer(), this.replicasManager.getLastEpoch(),
this.messageStore.getMaxPhyOffset(),
this.replicasManager.getConfirmOffset(),
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
this.brokerConfig.getBrokerElectionPriority()
);
}
}
this.replicasManager.sendHeartbeatToController();
}

if (this.brokerConfig.isEnableSlaveActingMaster()) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,21 @@
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
Expand All @@ -129,18 +135,21 @@

import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST;
import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER;

public class BrokerOuterAPI {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr());
private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
private final ClientMetadata clientMetadata;
private final RpcClient rpcClient;
private String nameSrvAddr = null;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));

private ClientMetadata clientMetadata;
private RpcClient rpcClient;

public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new ClientMetadata());
Expand Down Expand Up @@ -207,7 +216,7 @@ public boolean checkAddressReachable(String address) {

public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List<String> lst = new ArrayList<>(Arrays.asList(addrArray));
List<String> lst = new ArrayList<String>(Arrays.asList(addrArray));
this.remotingClient.updateNameServerAddressList(lst);
}

Expand Down Expand Up @@ -1140,10 +1149,10 @@ public GetMetaDataResponseHeader getControllerMetaData(final String controllerAd
public SyncStateSet alterSyncStateSet(
final String controllerAddress,
final String brokerName,
final String masterAddress, final int masterEpoch,
final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception {
final Long masterBrokerId, final int masterEpoch,
final Set<Long> newSyncStateSet, final int syncStateSetEpoch) throws Exception {

final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterBrokerId, masterEpoch);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, requestHeader);
request.setBody(new SyncStateSet(newSyncStateSet, syncStateSetEpoch).encode());
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
Expand All @@ -1161,24 +1170,58 @@ public SyncStateSet alterSyncStateSet(
}

/**
* Register broker to controller
* Broker try to elect itself as a master in broker set
*/
public RegisterBrokerToControllerResponseHeader registerBrokerToController(
final String controllerAddress, final String clusterName,
final String brokerName, final String address, final long controllerHeartbeatTimeoutMills, final int epoch,
final long maxOffset, final int electionPriority) throws Exception {
public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName,
Long brokerId) throws Exception {

final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, controllerHeartbeatTimeoutMills, epoch, maxOffset, electionPriority);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
switch (response.getCode()) {
case SUCCESS: {
return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
}
case CONTROLLER_NOT_LEADER: {
throw new MQBrokerException(response.getCode(), "Controller leader was changed");
}
case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
throw new MQBrokerException(response.getCode(), response.getRemark());
case CONTROLLER_ELECT_MASTER_FAILED:
case CONTROLLER_MASTER_STILL_EXIST:
case SUCCESS:
return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName, final String controllerAddress) throws Exception {
final GetNextBrokerIdRequestHeader requestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_NEXT_BROKER_ID, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
if (response.getCode() == SUCCESS) {
return (GetNextBrokerIdResponseHeader) response.decodeCommandCustomHeader(GetNextBrokerIdResponseHeader.class);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName, final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception {
final ApplyBrokerIdRequestHeader requestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, registerCheckCode);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_APPLY_BROKER_ID, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
if (response.getCode() == SUCCESS) {
return (ApplyBrokerIdResponseHeader) response.decodeCommandCustomHeader(ApplyBrokerIdResponseHeader.class);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public RegisterBrokerToControllerResponseHeader registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
if (response.getCode() == SUCCESS) {
return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
Expand All @@ -1187,8 +1230,8 @@ public RegisterBrokerToControllerResponseHeader registerBrokerToController(
* Get broker replica info
*/
public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(final String controllerAddress,
final String brokerName, final String brokerAddress) throws Exception {
final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName, brokerAddress);
final String brokerName) throws Exception {
final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
Expand Down Expand Up @@ -1237,6 +1280,7 @@ public void sendHeartbeatToController(final String controllerAddress,
requestHeader.setConfirmOffset(confirmOffset);
requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills);
requestHeader.setElectionPriority(electionPriority);
requestHeader.setBrokerId(brokerId);
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
@Override
public void run0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2635,7 +2635,7 @@ private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,

final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
if (replicasManager != null) {
replicasManager.changeBrokerRole(requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), requestHeader.getBrokerId());
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down
Loading