Skip to content

Commit

Permalink
[ISSUE #6189] Replace ThreadFactory create (#6190)
Browse files Browse the repository at this point in the history
* Replace ThreadFactory create

* delete unnecessary imports

* remove useless import

* reformat code
  • Loading branch information
hardyfish authored Mar 8, 2023
1 parent 9e0fb1b commit 5c45294
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -34,6 +33,7 @@
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -66,12 +66,7 @@ public TopicRouteInfoManager(BrokerController brokerController) {
}

public void start() {
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "TopicRouteInfoManagerScheduledThread");
}
});
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));

this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.Validators;
Expand Down Expand Up @@ -169,12 +168,7 @@ public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePull
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorMessageQueueChangeThread"));
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand All @@ -34,12 +34,7 @@ public class PullMessageService extends ServiceThread {

private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PullMessageServiceScheduledThread");
}
});
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("PullMessageServiceScheduledThread"));

public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
Expand Down Expand Up @@ -127,9 +122,9 @@ public void run() {
try {
MessageRequest messageRequest = this.messageRequestQueue.take();
if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
this.popMessage((PopRequest)messageRequest);
this.popMessage((PopRequest) messageRequest);
} else {
this.pullMessage((PullRequest)messageRequest);
this.pullMessage((PullRequest) messageRequest);
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
Expand Down Expand Up @@ -67,6 +65,7 @@
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.compression.Compressor;
Expand Down Expand Up @@ -140,23 +139,16 @@ public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
new ThreadFactoryImpl("AsyncSenderExecutor_"));
if (defaultMQProducer.getBackPressureForAsyncSendNum() > 10) {
semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),10), true);
semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 10), true);
} else {
semaphoreAsyncSendNum = new Semaphore(10, true);
log.info("semaphoreAsyncSendNum can not be smaller than 10.");
}

if (defaultMQProducer.getBackPressureForAsyncSendNum() > 1024 * 1024) {
semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),1024 * 1024), true);
semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 1024 * 1024), true);
} else {
semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
Expand Down Expand Up @@ -529,16 +521,16 @@ public void run() {
}
} else {
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
};
executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
}

public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
final long timeout, final long beginStartTime)
throws MQClientException, InterruptedException {
final long timeout, final long beginStartTime)
throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
boolean isSemaphoreAsyncNumAquired = false;
Expand All @@ -549,18 +541,18 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final
if (isEnableBackpressureForAsyncMode) {
long costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncNumAquired = timeout - costTime > 0
&& semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
&& semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
if (!isSemaphoreAsyncNumAquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
return;
}
costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncSizeAquired = timeout - costTime > 0
&& semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
&& semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
if (!isSemaphoreAsyncSizeAquired) {
sendCallback.onException(
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
return;
}
}
Expand Down Expand Up @@ -1035,6 +1027,7 @@ public void doExecuteEndTransactionHook(Message msg, String msgId, String broker
executeEndTransactionHook(context);
}
}

/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
Expand Down Expand Up @@ -1231,7 +1224,7 @@ public void run() {
try {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
timeout - costTime);
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,38 +63,20 @@ public static ThreadFactory newGenericThreadFactory(String processName, int thre
}

public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
return new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
thread.setDaemon(isDaemon);
return thread;
}
};
return new ThreadFactoryImpl(processName + "_", isDaemon);
}

public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
final boolean isDaemon) {
return new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
thread.setDaemon(isDaemon);
return thread;
}
};
return new ThreadFactoryImpl(String.format("%s_%d_", processName, threads), isDaemon);
}

/**
* Create a new thread
*
* @param name The name of the thread
* @param name The name of the thread
* @param runnable The work for the thread to do
* @param daemon Should the thread block JVM stop?
* @param daemon Should the thread block JVM stop?
* @return The unstarted thread
*/
public static Thread newThread(String name, Runnable runnable, boolean daemon) {
Expand All @@ -121,7 +103,7 @@ public static void shutdownGracefully(final Thread t) {
* Shutdown passed thread using isAlive and join.
*
* @param millis Pass 0 if we're to wait forever.
* @param t Thread to stop
* @param t Thread to stop
*/
public static void shutdownGracefully(final Thread t, final long millis) {
if (t == null)
Expand All @@ -141,7 +123,7 @@ public static void shutdownGracefully(final Thread t, final long millis) {
* {@link ExecutorService}.
*
* @param executor executor
* @param timeout timeout
* @param timeout timeout
* @param timeUnit timeUnit
*/
public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
Expand All @@ -40,17 +40,12 @@ public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
Set<String> topics = new HashSet<>();
//You would better to register topics,It will use in rebalance when starting
//You would be better to register topics,It will use in rebalance when starting
topics.add("TopicTest");
consumer.setRegisterTopics(topics);
consumer.start();

ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PullConsumerThread");
}
});
ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactoryImpl("PullConsumerThread"));
for (String topic : consumer.getRegisterTopics()) {

executors.execute(new Runnable() {
Expand Down Expand Up @@ -137,7 +132,7 @@ public long consumeFromOffset(MessageQueue messageQueue) throws MQClientExceptio

public void incPullTPS(String topic, int pullSize) {
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -70,6 +69,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -139,37 +139,15 @@ public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
publicThreadNums = 4;
}

this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_"));

this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(32), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());
}
}
);
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_"));

if (eventLoopGroup != null) {
this.eventLoopGroupWorker = eventLoopGroup;
} else {
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactoryImpl("NettyClientSelector_"));
}
this.defaultEventExecutorGroup = eventExecutorGroup;

Expand Down Expand Up @@ -205,15 +183,7 @@ public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
new ThreadFactoryImpl("NettyClientWorkerThread_"));
}
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
Expand Down
Loading

0 comments on commit 5c45294

Please sign in to comment.