diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/EagerExecutorService.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/EagerExecutorService.java new file mode 100644 index 000000000000..94c470652caf --- /dev/null +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/EagerExecutorService.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.common.concurrent; + +import java.util.concurrent.ExecutorService; + +/** + * EagerExecutorService. + */ +public interface EagerExecutorService extends ExecutorService { + + /** + * Returns the current number of threads in the pool. + * + * @return the number of threads + */ + int getPoolSize(); + + /** + * Returns the approximate number of threads that are actively + * executing tasks. + * + * @return the number of threads + */ + int getActiveCount(); + + /** + * Returns the maximum allowed number of threads. + * + * @return the maximum allowed number of threads + */ + int getMaximumPoolSize(); +} diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java index c6a1ec5c5e2e..5df59eb09ed1 100644 --- a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java @@ -18,8 +18,6 @@ package org.apache.shenyu.common.concurrent; import java.lang.instrument.Instrumentation; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; /** * MemoryLimitedTaskQueue in the {@link org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor}. @@ -28,11 +26,11 @@ * That can make the executor create new worker * when the task num is bigger than corePoolSize but less than maximumPoolSize. */ -public class MemoryLimitedTaskQueue extends MemoryLimitedLinkedBlockingQueue { +public class MemoryLimitedTaskQueue extends MemoryLimitedLinkedBlockingQueue implements TaskQueue { private static final long serialVersionUID = -2635853580887179627L; - private ShenyuThreadPoolExecutor executor; + private EagerExecutorService executor; public MemoryLimitedTaskQueue(final Instrumentation inst) { super(inst); @@ -42,50 +40,18 @@ public MemoryLimitedTaskQueue(final long memoryLimit, final Instrumentation inst super(memoryLimit, inst); } - /** - * set the executor. - * - * @param executor executor - */ - public void setExecutor(final ShenyuThreadPoolExecutor executor) { - this.executor = executor; + @Override + public EagerExecutorService getExecutor() { + return executor; } @Override - public boolean offer(final Runnable runnable) { - if (executor == null) { - throw new RejectedExecutionException("The task queue does not have executor!"); - } - - int currentPoolThreadSize = executor.getPoolSize(); - // have free worker. put task into queue to let the worker deal with task. - if (executor.getActiveCount() < currentPoolThreadSize) { - return super.offer(runnable); - } - - // return false to let executor create new worker. - if (currentPoolThreadSize < executor.getMaximumPoolSize()) { - return false; - } - - // currentPoolThreadSize >= max - return super.offer(runnable); + public void setExecutor(final EagerExecutorService executor) { + this.executor = executor; } - /** - * retry offer task. - * - * @param o task - * @param timeout timeout - * @param unit timeout unit - * @return offer success or not - * @throws java.util.concurrent.RejectedExecutionException if executor is terminated. - * @throws java.lang.InterruptedException if the current thread is interrupted. - */ - public boolean retryOffer(final Runnable o, final long timeout, final TimeUnit unit) throws InterruptedException { - if (executor.isShutdown()) { - throw new RejectedExecutionException("Executor is shutdown!"); - } - return super.offer(o, timeout, unit); + @Override + public boolean doOffer(final Runnable runnable) { + return super.offer(runnable); } } diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java index 1a9f93b1f561..431a38043a6b 100644 --- a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java @@ -26,16 +26,17 @@ /** * ShenyuThreadPoolExecutor. */ -public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor { +public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor implements EagerExecutorService { public ShenyuThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, - final MemoryLimitedTaskQueue workQueue, + final TaskQueue workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + workQueue.setExecutor(this); } @Override @@ -48,8 +49,7 @@ public void execute(final Runnable command) { super.execute(command); } catch (RejectedExecutionException e) { // retry to offer the task into queue. - @SuppressWarnings("all") - final MemoryLimitedTaskQueue queue = (MemoryLimitedTaskQueue) super.getQueue(); + final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { throw new RejectedExecutionException("Queue capacity is full.", e); diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/TaskQueue.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/TaskQueue.java new file mode 100644 index 000000000000..fce00cc21a9e --- /dev/null +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/TaskQueue.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.common.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * TaskQueue. + */ +public interface TaskQueue extends BlockingQueue { + + /** + * get executor. + * + * @return the executor + */ + EagerExecutorService getExecutor(); + + /** + * set the executor. + * + * @param executor executor + */ + void setExecutor(EagerExecutorService executor); + + @Override + default boolean offer(final E e) { + if (getExecutor() == null) { + throw new RejectedExecutionException("The task queue does not have executor!"); + } + + int currentPoolThreadSize = getExecutor().getPoolSize(); + // have free worker. put task into queue to let the worker deal with task. + if (getExecutor().getActiveCount() < currentPoolThreadSize) { + return doOffer(e); + } + + // return false to let executor create new worker. + if (currentPoolThreadSize < getExecutor().getMaximumPoolSize()) { + return false; + } + + // currentPoolThreadSize >= max + return doOffer(e); + } + + /** + * offer element to the queue. + * + * @param e the element to add + * @return {@code true} if the element was added to this queue, else {@code false} + */ + boolean doOffer(E e); + + /** + * retry offer task. + * + * @param o task + * @param timeout timeout + * @param unit timeout unit + * @return offer success or not + * @throws java.util.concurrent.RejectedExecutionException if executor is terminated. + * @throws java.lang.InterruptedException if the current thread is interrupted. + */ + default boolean retryOffer(final E o, final long timeout, final TimeUnit unit) throws InterruptedException { + if (getExecutor().isShutdown()) { + throw new RejectedExecutionException("Executor is shutdown!"); + } + return offer(o, timeout, unit); + } +} diff --git a/shenyu-web/src/main/java/org/apache/shenyu/web/configuration/ShenyuThreadPoolConfiguration.java b/shenyu-web/src/main/java/org/apache/shenyu/web/configuration/ShenyuThreadPoolConfiguration.java index a84161e706c6..c0b8ed4f190c 100644 --- a/shenyu-web/src/main/java/org/apache/shenyu/web/configuration/ShenyuThreadPoolConfiguration.java +++ b/shenyu-web/src/main/java/org/apache/shenyu/web/configuration/ShenyuThreadPoolConfiguration.java @@ -21,8 +21,13 @@ import org.apache.shenyu.common.concurrent.MemoryLimitedTaskQueue; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor; +import org.apache.shenyu.common.concurrent.TaskQueue; import org.apache.shenyu.common.config.ShenyuConfig; +import org.apache.shenyu.common.exception.ShenyuException; import org.apache.shenyu.plugin.api.utils.SpringBeanUtils; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; @@ -39,23 +44,42 @@ @Configuration public class ShenyuThreadPoolConfiguration { + /** + * MemoryLimitedTaskQueue. + * + * @param shenyuConfig the shenyu config + * @return instance of {@link MemoryLimitedTaskQueue} + */ + @Bean + @ConditionalOnMissingBean(TaskQueue.class) + @ConditionalOnProperty("shenyu.sharedPool.maxWorkQueueMemory") + public TaskQueue memoryLimitedTaskQueue(final ShenyuConfig shenyuConfig) { + final Instrumentation instrumentation = ByteBuddyAgent.install(); + final ShenyuConfig.SharedPool sharedPool = shenyuConfig.getSharedPool(); + final Long maxWorkQueueMemory = sharedPool.getMaxWorkQueueMemory(); + if (maxWorkQueueMemory <= 0) { + throw new ShenyuException("${shenyu.sharedPool.maxWorkQueueMemory} must bigger than 0 !"); + } + return new MemoryLimitedTaskQueue<>(maxWorkQueueMemory, instrumentation); + } + /** * crate shenyu shared thread pool executor. * * @param shenyuConfig the shenyu config + * @param provider the queue bean provider * @return the shenyu thread pool executor */ @Bean @ConditionalOnProperty(name = "shenyu.sharedPool.enable", havingValue = "true") - public ShenyuThreadPoolExecutor shenyuThreadPoolExecutor(final ShenyuConfig shenyuConfig) { - final Instrumentation instrumentation = ByteBuddyAgent.install(); + public ShenyuThreadPoolExecutor shenyuThreadPoolExecutor(final ShenyuConfig shenyuConfig, + final ObjectProvider> provider) { final ShenyuConfig.SharedPool sharedPool = shenyuConfig.getSharedPool(); final Integer corePoolSize = sharedPool.getCorePoolSize(); final Integer maximumPoolSize = sharedPool.getMaximumPoolSize(); final Long keepAliveTime = sharedPool.getKeepAliveTime(); - final Long maxWorkQueueMemory = sharedPool.getMaxWorkQueueMemory(); return new ShenyuThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, - TimeUnit.MILLISECONDS, new MemoryLimitedTaskQueue<>(maxWorkQueueMemory, instrumentation), + TimeUnit.MILLISECONDS, provider.getIfAvailable(), ShenyuThreadFactory.create(sharedPool.getPrefix(), true), new ThreadPoolExecutor.AbortPolicy()); } @@ -66,7 +90,7 @@ public ShenyuThreadPoolExecutor shenyuThreadPoolExecutor(final ShenyuConfig shen * @return the shenyu thread pool executor destructor */ @Bean - @ConditionalOnProperty(name = "shenyu.sharedPool.enable", havingValue = "true") + @ConditionalOnBean(ShenyuThreadPoolExecutor.class) public ShenyuThreadPoolExecutorDestructor shenyuThreadPoolExecutorDestructor() { return new ShenyuThreadPoolExecutorDestructor(); }