diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 068187e4013..d3a1c1fb80b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -202,7 +202,7 @@ private boolean startBasicService() { // The scheduled task for heartbeat sending is not starting now, so we should manually send heartbeat request this.sendHeartbeatToController(); if (this.masterBrokerId != null || brokerElect()) { - LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterAddress, this.masterBrokerId); + LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterBrokerId, this.masterAddress); this.state = State.RUNNING; this.brokerController.setIsolated(false); LOGGER.info("All register process has been done, change state to: {}", this.state); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index 18e9992d31c..c4fbf0c8d2f 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -17,9 +17,13 @@ package org.apache.rocketmq.controller; import java.util.Map; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -27,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ControllerConfig; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; @@ -67,6 +72,8 @@ public class ControllerManager { private ExecutorService controllerRequestExecutor; private BlockingQueue controllerRequestThreadPoolQueue; + private NotifyService notifyService; + public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) { this.controllerConfig = controllerConfig; @@ -77,6 +84,7 @@ public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig ne this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath"); this.remotingClient = new NettyRemotingClient(nettyClientConfig); this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig); + this.notifyService = new NotifyService(); } public boolean initialize() { @@ -93,6 +101,7 @@ protected RunnableFuture newTaskFor(final Runnable runnable, final T valu return new FutureTaskExt(runnable, value); } }; + this.notifyService.initialize(); if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) { throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty"); } @@ -164,7 +173,7 @@ public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) { // Inform all active brokers final Map brokerAddrs = memberGroup.getBrokerAddrs(); brokerAddrs.entrySet().stream().filter(x -> this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey())) - .forEach(x -> doNotifyBrokerRoleChanged(x.getValue(), entry)); + .forEach(x -> this.notifyService.notifyBroker(x.getValue(), entry)); } } @@ -214,6 +223,7 @@ public void start() { public void shutdown() { this.heartbeatManager.shutdown(); this.controllerRequestExecutor.shutdown(); + this.notifyService.shutdown(); this.controller.shutdown(); this.remotingClient.shutdown(); } @@ -245,4 +255,77 @@ public BrokerHousekeepingService getBrokerHousekeepingService() { public Configuration getConfiguration() { return configuration; } + + class NotifyService { + private ExecutorService executorService; + + private Map currentNotifyFutures; + + public NotifyService() { + } + + public void initialize() { + this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ControllerManager_NotifyService_")); + this.currentNotifyFutures = new ConcurrentHashMap<>(); + } + + public void notifyBroker(String brokerAddress, RoleChangeNotifyEntry entry) { + int masterEpoch = entry.getMasterEpoch(); + NotifyTask oldTask = this.currentNotifyFutures.get(brokerAddress); + if (oldTask != null && masterEpoch > oldTask.getMasterEpoch()) { + // cancel current future + Future oldFuture = oldTask.getFuture(); + if (oldFuture != null && !oldFuture.isDone()) { + oldFuture.cancel(true); + } + } + final NotifyTask task = new NotifyTask(masterEpoch, null); + Runnable runnable = () -> { + doNotifyBrokerRoleChanged(brokerAddress, entry); + this.currentNotifyFutures.remove(brokerAddress, task); + }; + this.currentNotifyFutures.put(brokerAddress, task); + Future future = this.executorService.submit(runnable); + task.setFuture(future); + } + + public void shutdown() { + if (!this.executorService.isShutdown()) { + this.executorService.shutdownNow(); + } + } + + class NotifyTask extends Pair { + public NotifyTask(Integer masterEpoch, Future future) { + super(masterEpoch, future); + } + + public Integer getMasterEpoch() { + return super.getObject1(); + } + + public Future getFuture() { + return super.getObject2(); + } + + public void setFuture(Future future) { + super.setObject2(future); + } + + @Override + public int hashCode() { + return Objects.hashCode(super.getObject1()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof NotifyTask)) { + return false; + } + NotifyTask task = (NotifyTask) obj; + return super.getObject1().equals(task.getObject1()); + } + } + } } diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java similarity index 99% rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java rename to controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java index b7a4c328e48..8ad67d404e5 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerManagerTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.controller.impl.controller; +package org.apache.rocketmq.controller; import java.io.File; import java.time.Duration; @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.controller.ControllerManager; import org.apache.rocketmq.controller.impl.DLedgerController; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.netty.NettyClientConfig; diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java similarity index 95% rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java rename to controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java index 9b8fa757c52..f77f49dcf25 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerTestBase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.controller.impl.controller; +package org.apache.rocketmq.controller; public class ControllerTestBase { diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java similarity index 97% rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java rename to controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java index 3bffad689c1..eaf78b63dff 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.controller.impl.controller.impl; +package org.apache.rocketmq.controller.impl; import io.openmessaging.storage.dledger.DLedgerConfig; import java.io.File; @@ -31,7 +31,6 @@ import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.controller.Controller; import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy; -import org.apache.rocketmq.controller.impl.DLedgerController; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.ResponseCode; @@ -49,9 +48,9 @@ import org.junit.Before; import org.junit.Test; -import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME; -import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME; -import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP; +import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME; +import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME; +import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java similarity index 97% rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java rename to controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java index 74de637dd2f..b97ea3249e0 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.controller.impl.controller.impl; +package org.apache.rocketmq.controller.impl; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java similarity index 98% rename from controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java rename to controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java index f677daf285c..19411e778e0 100644 --- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.controller.impl.controller.impl.manager; +package org.apache.rocketmq.controller.impl.manager; import java.util.Arrays; import java.util.HashSet; @@ -29,7 +29,6 @@ import org.apache.rocketmq.controller.impl.event.ControllerResult; import org.apache.rocketmq.controller.impl.event.ElectMasterEvent; import org.apache.rocketmq.controller.impl.event.EventMessage; -import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; @@ -51,9 +50,9 @@ import org.junit.Before; import org.junit.Test; -import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME; -import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME; -import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP; +import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME; +import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME; +import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse;