Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6346] Support asynchronously notify brokers when their roles has been changed #6348

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private boolean startBasicService() {
// The scheduled task for heartbeat sending is not starting now, so we should manually send heartbeat request
this.sendHeartbeatToController();
if (this.masterBrokerId != null || brokerElect()) {
LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterAddress, this.masterBrokerId);
LOGGER.info("Master in this broker set is elected, masterBrokerId: {}, masterBrokerAddr: {}", this.masterBrokerId, this.masterAddress);
this.state = State.RUNNING;
this.brokerController.setIsolated(false);
LOGGER.info("All register process has been done, change state to: {}", this.state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
package org.apache.rocketmq.controller;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
Expand Down Expand Up @@ -67,6 +72,8 @@ public class ControllerManager {
private ExecutorService controllerRequestExecutor;
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;

private NotifyService notifyService;

public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this.controllerConfig = controllerConfig;
Expand All @@ -77,6 +84,7 @@ public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig ne
this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
this.notifyService = new NotifyService();
}

public boolean initialize() {
Expand All @@ -93,6 +101,7 @@ protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T valu
return new FutureTaskExt<T>(runnable, value);
}
};
this.notifyService.initialize();
TheR1sing3un marked this conversation as resolved.
Show resolved Hide resolved
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
}
Expand Down Expand Up @@ -164,7 +173,7 @@ public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) {
// Inform all active brokers
final Map<Long, String> brokerAddrs = memberGroup.getBrokerAddrs();
brokerAddrs.entrySet().stream().filter(x -> this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey()))
.forEach(x -> doNotifyBrokerRoleChanged(x.getValue(), entry));
.forEach(x -> this.notifyService.notifyBroker(x.getValue(), entry));
}
}

Expand Down Expand Up @@ -214,6 +223,7 @@ public void start() {
public void shutdown() {
this.heartbeatManager.shutdown();
this.controllerRequestExecutor.shutdown();
this.notifyService.shutdown();
this.controller.shutdown();
this.remotingClient.shutdown();
}
Expand Down Expand Up @@ -245,4 +255,77 @@ public BrokerHousekeepingService getBrokerHousekeepingService() {
public Configuration getConfiguration() {
return configuration;
}

class NotifyService {
private ExecutorService executorService;

private Map<String/*brokerAddress*/, NotifyTask/*currentNotifyTask*/> currentNotifyFutures;

public NotifyService() {
}

public void initialize() {
this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ControllerManager_NotifyService_"));
this.currentNotifyFutures = new ConcurrentHashMap<>();
}

public void notifyBroker(String brokerAddress, RoleChangeNotifyEntry entry) {
int masterEpoch = entry.getMasterEpoch();
NotifyTask oldTask = this.currentNotifyFutures.get(brokerAddress);
if (oldTask != null && masterEpoch > oldTask.getMasterEpoch()) {
// cancel current future
Future oldFuture = oldTask.getFuture();
if (oldFuture != null && !oldFuture.isDone()) {
oldFuture.cancel(true);
}
}
final NotifyTask task = new NotifyTask(masterEpoch, null);
Runnable runnable = () -> {
doNotifyBrokerRoleChanged(brokerAddress, entry);
this.currentNotifyFutures.remove(brokerAddress, task);
};
this.currentNotifyFutures.put(brokerAddress, task);
Future<?> future = this.executorService.submit(runnable);
task.setFuture(future);
}

public void shutdown() {
if (!this.executorService.isShutdown()) {
this.executorService.shutdownNow();
}
}

class NotifyTask extends Pair<Integer/*epochMaster*/, Future/*notifyFuture*/> {
public NotifyTask(Integer masterEpoch, Future future) {
super(masterEpoch, future);
}

public Integer getMasterEpoch() {
return super.getObject1();
}

public Future getFuture() {
return super.getObject2();
}

public void setFuture(Future future) {
super.setObject2(future);
}

@Override
public int hashCode() {
return Objects.hashCode(super.getObject1());
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof NotifyTask)) {
return false;
}
NotifyTask task = (NotifyTask) obj;
return super.getObject1().equals(task.getObject1());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller;
package org.apache.rocketmq.controller;

import java.io.File;
import java.time.Duration;
Expand All @@ -26,7 +26,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.controller.impl.controller;
package org.apache.rocketmq.controller;

public class ControllerTestBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller.impl;
package org.apache.rocketmq.controller.impl;

import io.openmessaging.storage.dledger.DLedgerConfig;
import java.io.File;
Expand All @@ -31,7 +31,6 @@
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
Expand All @@ -49,9 +48,9 @@
import org.junit.Before;
import org.junit.Test;

import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller.impl;
package org.apache.rocketmq.controller.impl;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.controller.impl.controller.impl.manager;
package org.apache.rocketmq.controller.impl.manager;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -29,7 +29,6 @@
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
Expand All @@ -51,9 +50,9 @@
import org.junit.Before;
import org.junit.Test;

import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
import static org.apache.rocketmq.controller.ControllerTestBase.DEFAULT_IP;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down