diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java index cc9c625004b..b3556472555 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -34,6 +33,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -66,12 +66,7 @@ public TopicRouteInfoManager(BrokerController brokerController) { } public void start() { - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "TopicRouteInfoManagerScheduledThread"); - } - }); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread")); this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index e5aed64d3ea..2d37581bbf1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -33,7 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.Validators; @@ -169,12 +168,7 @@ public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePull this.defaultLitePullConsumer.getPullThreadNums(), new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) ); - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "MonitorMessageQueueChangeThread"); - } - }); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorMessageQueueChangeThread")); this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java index d4801c335dc..b5e6f9f7900 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java @@ -19,10 +19,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -34,12 +34,7 @@ public class PullMessageService extends ServiceThread { private final MQClientInstance mQClientFactory; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "PullMessageServiceScheduledThread"); - } - }); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("PullMessageServiceScheduledThread")); public PullMessageService(MQClientInstance mQClientFactory) { this.mQClientFactory = mQClientFactory; @@ -127,9 +122,9 @@ public void run() { try { MessageRequest messageRequest = this.messageRequestQueue.take(); if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) { - this.popMessage((PopRequest)messageRequest); + this.popMessage((PopRequest) messageRequest); } else { - this.pullMessage((PullRequest)messageRequest); + this.pullMessage((PullRequest) messageRequest); } } catch (InterruptedException ignored) { } catch (Exception e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 4f6fbafddf8..4677df6904f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -31,10 +31,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -67,6 +65,7 @@ import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.compression.CompressionType; import org.apache.rocketmq.common.compression.Compressor; @@ -140,23 +139,16 @@ public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, - new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); - } - }); + new ThreadFactoryImpl("AsyncSenderExecutor_")); if (defaultMQProducer.getBackPressureForAsyncSendNum() > 10) { - semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),10), true); + semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 10), true); } else { semaphoreAsyncSendNum = new Semaphore(10, true); log.info("semaphoreAsyncSendNum can not be smaller than 10."); } if (defaultMQProducer.getBackPressureForAsyncSendNum() > 1024 * 1024) { - semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),1024 * 1024), true); + semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 1024 * 1024), true); } else { semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true); log.info("semaphoreAsyncSendSize can not be smaller than 1M."); @@ -529,7 +521,7 @@ public void run() { } } else { sendCallback.onException( - new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); + new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); } } }; @@ -537,8 +529,8 @@ public void run() { } public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback, - final long timeout, final long beginStartTime) - throws MQClientException, InterruptedException { + final long timeout, final long beginStartTime) + throws MQClientException, InterruptedException { ExecutorService executor = this.getAsyncSenderExecutor(); boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode(); boolean isSemaphoreAsyncNumAquired = false; @@ -549,18 +541,18 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final if (isEnableBackpressureForAsyncMode) { long costTime = System.currentTimeMillis() - beginStartTime; isSemaphoreAsyncNumAquired = timeout - costTime > 0 - && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS); + && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS); if (!isSemaphoreAsyncNumAquired) { sendCallback.onException( - new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout")); + new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout")); return; } costTime = System.currentTimeMillis() - beginStartTime; isSemaphoreAsyncSizeAquired = timeout - costTime > 0 - && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS); + && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS); if (!isSemaphoreAsyncSizeAquired) { sendCallback.onException( - new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout")); + new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout")); return; } } @@ -1035,6 +1027,7 @@ public void doExecuteEndTransactionHook(Message msg, String msgId, String broker executeEndTransactionHook(context); } } + /** * DEFAULT ONEWAY ------------------------------------------------------- */ @@ -1231,7 +1224,7 @@ public void run() { try { try { sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, - timeout - costTime); + timeout - costTime); } catch (MQBrokerException e) { throw new MQClientException("unknown exception", e); } diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java index 99526f3a10e..4b366d4e39b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java @@ -24,7 +24,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -63,38 +63,20 @@ public static ThreadFactory newGenericThreadFactory(String processName, int thre } public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) { - return new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet())); - thread.setDaemon(isDaemon); - return thread; - } - }; + return new ThreadFactoryImpl(processName + "_", isDaemon); } public static ThreadFactory newGenericThreadFactory(final String processName, final int threads, final boolean isDaemon) { - return new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet())); - thread.setDaemon(isDaemon); - return thread; - } - }; + return new ThreadFactoryImpl(String.format("%s_%d_", processName, threads), isDaemon); } /** * Create a new thread * - * @param name The name of the thread + * @param name The name of the thread * @param runnable The work for the thread to do - * @param daemon Should the thread block JVM stop? + * @param daemon Should the thread block JVM stop? * @return The unstarted thread */ public static Thread newThread(String name, Runnable runnable, boolean daemon) { @@ -121,7 +103,7 @@ public static void shutdownGracefully(final Thread t) { * Shutdown passed thread using isAlive and join. * * @param millis Pass 0 if we're to wait forever. - * @param t Thread to stop + * @param t Thread to stop */ public static void shutdownGracefully(final Thread t, final long millis) { if (t == null) @@ -141,7 +123,7 @@ public static void shutdownGracefully(final Thread t, final long millis) { * {@link ExecutorService}. * * @param executor executor - * @param timeout timeout + * @param timeout timeout * @param timeUnit timeUnit */ public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) { diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java index ff9ef9ca34e..5ac8d247d95 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java @@ -21,13 +21,13 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -40,17 +40,12 @@ public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); Set topics = new HashSet<>(); - //You would better to register topics,It will use in rebalance when starting + //You would be better to register topics,It will use in rebalance when starting topics.add("TopicTest"); consumer.setRegisterTopics(topics); consumer.start(); - ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "PullConsumerThread"); - } - }); + ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactoryImpl("PullConsumerThread")); for (String topic : consumer.getRegisterTopics()) { executors.execute(new Runnable() { @@ -137,7 +132,7 @@ public long consumeFromOffset(MessageQueue messageQueue) throws MQClientExceptio public void incPullTPS(String topic, int pullSize) { consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory() - .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize); + .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize); } }); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 540e0b76df8..69d8a275be5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -70,6 +69,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -139,37 +139,15 @@ public NettyRemotingClient(final NettyClientConfig nettyClientConfig, publicThreadNums = 4; } - this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet()); - } - }); + this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_")); this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(32), new ThreadFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet()); - } - } - ); + new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_")); if (eventLoopGroup != null) { this.eventLoopGroupWorker = eventLoopGroup; } else { - this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); - } - }); + this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactoryImpl("NettyClientSelector_")); } this.defaultEventExecutorGroup = eventExecutorGroup; @@ -205,15 +183,7 @@ public void start() { if (this.defaultEventExecutorGroup == null) { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), - new ThreadFactory() { - - private AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); - } - }); + new ThreadFactoryImpl("NettyClientWorkerThread_")); } Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java index ee6f3f6c2c4..eb623a9de92 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java @@ -21,9 +21,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.junit.Assert; import org.junit.Test; @@ -45,14 +44,7 @@ public void remotingCodeCountTest() throws Exception { int count = 1000 * 1000; CountDownLatch latch = new CountDownLatch(threadCount); AtomicBoolean result = new AtomicBoolean(true); - ExecutorService executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "RemotingCodeTest_" + this.threadIndex.incrementAndGet()); - } - }); + ExecutorService executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactoryImpl("RemotingCodeTest_")); for (int i = 0; i < threadCount; i++) { executorService.submit(() -> {