Skip to content

Commit

Permalink
[ISSUE apache#5712] Fix the invalid of heartbeat detection after cont…
Browse files Browse the repository at this point in the history
…roller switch (apache#5711)

* Fix the invalid of heartbeat detection after controller switch

* Pass the checkstyle

* Format ReplicasInfoManagerTest code style
  • Loading branch information
RongtongJin committed Jan 10, 2023
1 parent da33224 commit 96708de
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,9 @@ protected void sendHeartbeat() {
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer(), this.replicasManager.getLastEpoch(),
this.messageStore.getMaxPhyOffset(),
this.replicasManager.getConfirmOffset()
this.replicasManager.getConfirmOffset(),
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
this.brokerConfig.getBrokerElectionPriority()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private boolean registerBrokerToController() {
// Register this broker to controller, get brokerId and masterAddress.
try {
final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress,
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
final String newMasterAddress = registerResponse.getMasterAddress();
if (StringUtils.isNoneEmpty(newMasterAddress)) {
Expand Down
318 changes: 161 additions & 157 deletions broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void before() throws Exception {
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
replicasManager = new ReplicasManager(brokerController);
autoSwitchHAService.init(defaultMessageStore);
Expand All @@ -145,7 +145,7 @@ public void changeBrokerRoleTest() {
.doesNotThrowAnyException();

// equal to localAddress
Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH , SLAVE_BROKER_ID))
Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, SLAVE_BROKER_ID))
.doesNotThrowAnyException();
}

Expand Down
56 changes: 52 additions & 4 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@

import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;

public class BrokerConfig extends BrokerIdentity {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

private String brokerConfigPath = null;

Expand Down Expand Up @@ -336,6 +332,10 @@ public class BrokerConfig extends BrokerIdentity {

private long syncControllerMetadataPeriod = 10 * 1000;

private long controllerHeartBeatTimeoutMills = 10 * 1000;

private boolean validateSystemTopicWhenUpdateTopic = true;

/**
* It is an important basis for the controller to choose the broker master.
* The lower the value of brokerElectionPriority, the higher the priority of the broker being selected as the master.
Expand Down Expand Up @@ -389,6 +389,14 @@ public boolean isEnable() {

private boolean metricsInDelta = false;

private long channelExpiredTimeout = 1000 * 120;
private long subscriptionExpiredTimeout = 1000 * 60 * 10;

/**
* Estimate accumulation or not when subscription filter type is tag and is not SUB_ALL.
*/
private boolean estimateAccumulation = true;

public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
Expand Down Expand Up @@ -1437,6 +1445,14 @@ public void setBrokerElectionPriority(int brokerElectionPriority) {
this.brokerElectionPriority = brokerElectionPriority;
}

public long getControllerHeartBeatTimeoutMills() {
return controllerHeartBeatTimeoutMills;
}

public void setControllerHeartBeatTimeoutMills(long controllerHeartBeatTimeoutMills) {
this.controllerHeartBeatTimeoutMills = controllerHeartBeatTimeoutMills;
}

public boolean isRecoverConcurrently() {
return recoverConcurrently;
}
Expand Down Expand Up @@ -1588,4 +1604,36 @@ public int getTransactionOpBatchInterval() {
public void setTransactionOpBatchInterval(int transactionOpBatchInterval) {
this.transactionOpBatchInterval = transactionOpBatchInterval;
}

public long getChannelExpiredTimeout() {
return channelExpiredTimeout;
}

public void setChannelExpiredTimeout(long channelExpiredTimeout) {
this.channelExpiredTimeout = channelExpiredTimeout;
}

public long getSubscriptionExpiredTimeout() {
return subscriptionExpiredTimeout;
}

public void setSubscriptionExpiredTimeout(long subscriptionExpiredTimeout) {
this.subscriptionExpiredTimeout = subscriptionExpiredTimeout;
}

public boolean isValidateSystemTopicWhenUpdateTopic() {
return validateSystemTopicWhenUpdateTopic;
}

public void setValidateSystemTopicWhenUpdateTopic(boolean validateSystemTopicWhenUpdateTopic) {
this.validateSystemTopicWhenUpdateTopic = validateSystemTopicWhenUpdateTopic;
}

public boolean isEstimateAccumulation() {
return estimateAccumulation;
}

public void setEstimateAccumulation(boolean estimateAccumulation) {
this.estimateAccumulation = estimateAccumulation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ public interface BrokerHeartbeatManager {
/**
* Broker new heartbeat.
*/
void onBrokerHeartbeat(final String clusterName, final String brokerAddr, final Integer epoch, final Long maxOffset, final Long confirmOffset);

/**
* Change the metadata(brokerId ..) for a broker.
*/
void changeBrokerMetadata(final String clusterName, final String brokerAddr, final Long brokerId);
void onBrokerHeartbeat(final String clusterName, final String brokerName, final String brokerAddr,
final Long brokerId, final Long timeoutMillis, final Channel channel, final Integer epoch,
final Long maxOffset, final Long confirmOffset, final Integer electionPriority);

/**
* Start heartbeat manager.
Expand All @@ -45,12 +42,6 @@ public interface BrokerHeartbeatManager {
*/
void addBrokerLifecycleListener(final BrokerLifecycleListener listener);

/**
* Register new broker to heartManager.
*/
void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset, final Integer electionPriority);

/**
* Broker channel close
*/
Expand All @@ -70,6 +61,7 @@ interface BrokerLifecycleListener {
/**
* Trigger when broker inactive.
*/
void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress, final long brokerId);
void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress,
final long brokerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
*/
package org.apache.rocketmq.controller;


import io.netty.channel.Channel;


public class BrokerLiveInfo {
private final String brokerName;

private final String brokerAddr;
private final long heartbeatTimeoutMillis;
private long heartbeatTimeoutMillis;
private final Channel channel;
private long brokerId;
private long lastUpdateTimestamp;
Expand All @@ -33,8 +31,8 @@ public class BrokerLiveInfo {
private long confirmOffset;
private Integer electionPriority;

public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
Channel channel, int epoch, long maxOffset, Integer electionPriority) {
public BrokerLiveInfo(String brokerName, String brokerAddr, long brokerId, long lastUpdateTimestamp,
long heartbeatTimeoutMillis, Channel channel, int epoch, long maxOffset, Integer electionPriority) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
Expand All @@ -46,8 +44,8 @@ public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long l
this.maxOffset = maxOffset;
}

public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
public BrokerLiveInfo(String brokerName, String brokerAddr, long brokerId, long lastUpdateTimestamp,
long heartbeatTimeoutMillis, Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
Expand All @@ -63,16 +61,16 @@ public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long l
@Override
public String toString() {
return "BrokerLiveInfo{" +
"brokerName='" + brokerName + '\'' +
", brokerAddr='" + brokerAddr + '\'' +
", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
", channel=" + channel +
", brokerId=" + brokerId +
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", epoch=" + epoch +
", maxOffset=" + maxOffset +
", confirmOffset=" + confirmOffset +
'}';
"brokerName='" + brokerName + '\'' +
", brokerAddr='" + brokerAddr + '\'' +
", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
", channel=" + channel +
", brokerId=" + brokerId +
", lastUpdateTimestamp=" + lastUpdateTimestamp +
", epoch=" + epoch +
", maxOffset=" + maxOffset +
", confirmOffset=" + confirmOffset +
'}';
}

public String getBrokerName() {
Expand All @@ -83,6 +81,10 @@ public long getHeartbeatTimeoutMillis() {
return heartbeatTimeoutMillis;
}

public void setHeartbeatTimeoutMillis(long heartbeatTimeoutMillis) {
this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
}

public Channel getChannel() {
return channel;
}
Expand Down Expand Up @@ -138,4 +140,5 @@ public Integer getElectionPriority() {
public long getConfirmOffset() {
return confirmOffset;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;

public class ControllerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
Expand All @@ -64,7 +66,7 @@ public class ControllerManager {
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;

public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
NettyClientConfig nettyClientConfig) {
this.controllerConfig = controllerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
Expand All @@ -77,12 +79,12 @@ public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig ne
public boolean initialize() {
this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
this.controllerRequestExecutor = new ThreadPoolExecutor(
this.controllerConfig.getControllerThreadPoolNums(),
this.controllerConfig.getControllerThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.controllerRequestThreadPoolQueue,
new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
this.controllerConfig.getControllerThreadPoolNums(),
this.controllerConfig.getControllerThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.controllerRequestThreadPoolQueue,
new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
return new FutureTaskExt<>(runnable, value);
Expand All @@ -96,8 +98,8 @@ protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T valu
throw new IllegalArgumentException("Attribute value controllerDLegerSelfId of ControllerConfig is null or empty");
}
this.controller = new DLedgerController(this.controllerConfig, this.heartbeatManager::isBrokerActive,
this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));
this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));

// Register broker inactive listener
this.heartbeatManager.addBrokerLifecycleListener(this::onBrokerInactive);
Expand All @@ -106,34 +108,39 @@ protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T valu
}

/**
* When the heartbeatManager detects the "Broker is not active",
* we call this method to elect a master and do something else.
* When the heartbeatManager detects the "Broker is not active", we call this method to elect a master and do
* something else.
*
* @param clusterName The cluster name of this inactive broker
* @param brokerName The inactive broker name
* @param brokerAddress The inactive broker address(ip)
* @param brokerId The inactive broker id
*/
private void onBrokerInactive(String clusterName, String brokerName, String brokerAddress, long brokerId) {
if (brokerId == MixAll.MASTER_ID) {
if (controller.isLeaderState()) {
final CompletableFuture<RemotingCommand> future = controller.electMaster(new ElectMasterRequestHeader(brokerName));
try {
final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
if (responseHeader != null) {
log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
heartbeatManager.changeBrokerMetadata(clusterName, responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
}
if (controllerConfig.isNotifyBrokerRoleChanged()) {
notifyBrokerRoleChanged(responseHeader, clusterName);
}
if (controller.isLeaderState()) {
try {
final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress));
final RemotingCommand replicaInfoResponse = replicaInfoFuture.get(5, TimeUnit.SECONDS);
final GetReplicaInfoResponseHeader replicaInfoResponseHeader = (GetReplicaInfoResponseHeader) replicaInfoResponse.readCustomHeader();
// Not master broker offline
if (!replicaInfoResponseHeader.getMasterAddress().equals(brokerAddress)) {
log.warn("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
return;
}
final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(new ElectMasterRequestHeader(brokerName));
final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
if (responseHeader != null) {
log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
if (controllerConfig.isNotifyBrokerRoleChanged()) {
notifyBrokerRoleChanged(responseHeader, clusterName);
}
} catch (Exception ignored) {
}
} else {
log.info("Broker{}' master shutdown", brokerName);
} catch (Exception e) {
log.error("", e);
}
} else {
log.info("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
}
}

Expand Down Expand Up @@ -161,11 +168,11 @@ public void notifyBrokerRoleChanged(final ElectMasterResponseHeader electMasterR
}

public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId,
final ElectMasterResponseHeader responseHeader) {
final ElectMasterResponseHeader responseHeader) {
if (StringUtils.isNoneEmpty(brokerAddr)) {
log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
try {
this.remotingClient.invokeOneway(brokerAddr, request, 3000);
Expand Down
Loading

0 comments on commit 96708de

Please sign in to comment.