Skip to content

Commit

Permalink
[ISSUE #3303] [type: refactor] refactor shared thread pool (#3315)
Browse files Browse the repository at this point in the history
* [ISSUE #3303] [type: refactor] refactor shared thread pool

* fix comment
  • Loading branch information
loongs-zhang authored Apr 25, 2022
1 parent d7d22f2 commit 86302d4
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.concurrent.ExecutorService;

/**
* EagerExecutorService.
*/
public interface EagerExecutorService extends ExecutorService {

/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
int getPoolSize();

/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
int getActiveCount();

/**
* Returns the maximum allowed number of threads.
*
* @return the maximum allowed number of threads
*/
int getMaximumPoolSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.shenyu.common.concurrent;

import java.lang.instrument.Instrumentation;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
* MemoryLimitedTaskQueue in the {@link org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor}.
Expand All @@ -28,11 +26,11 @@
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class MemoryLimitedTaskQueue<R extends Runnable> extends MemoryLimitedLinkedBlockingQueue<Runnable> {
public class MemoryLimitedTaskQueue<R extends Runnable> extends MemoryLimitedLinkedBlockingQueue<Runnable> implements TaskQueue<Runnable> {

private static final long serialVersionUID = -2635853580887179627L;

private ShenyuThreadPoolExecutor executor;
private EagerExecutorService executor;

public MemoryLimitedTaskQueue(final Instrumentation inst) {
super(inst);
Expand All @@ -42,50 +40,18 @@ public MemoryLimitedTaskQueue(final long memoryLimit, final Instrumentation inst
super(memoryLimit, inst);
}

/**
* set the executor.
*
* @param executor executor
*/
public void setExecutor(final ShenyuThreadPoolExecutor executor) {
this.executor = executor;
@Override
public EagerExecutorService getExecutor() {
return executor;
}

@Override
public boolean offer(final Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}

int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getActiveCount() < currentPoolThreadSize) {
return super.offer(runnable);
}

// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}

// currentPoolThreadSize >= max
return super.offer(runnable);
public void setExecutor(final EagerExecutorService executor) {
this.executor = executor;
}

/**
* retry offer task.
*
* @param o task
* @param timeout timeout
* @param unit timeout unit
* @return offer success or not
* @throws java.util.concurrent.RejectedExecutionException if executor is terminated.
* @throws java.lang.InterruptedException if the current thread is interrupted.
*/
public boolean retryOffer(final Runnable o, final long timeout, final TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
@Override
public boolean doOffer(final Runnable runnable) {
return super.offer(runnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@
/**
* ShenyuThreadPoolExecutor.
*/
public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor {
public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor implements EagerExecutorService {

public ShenyuThreadPoolExecutor(final int corePoolSize,
final int maximumPoolSize,
final long keepAliveTime,
final TimeUnit unit,
final MemoryLimitedTaskQueue<Runnable> workQueue,
final TaskQueue<Runnable> workQueue,
final ThreadFactory threadFactory,
final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
workQueue.setExecutor(this);
}

@Override
Expand All @@ -48,8 +49,7 @@ public void execute(final Runnable command) {
super.execute(command);
} catch (RejectedExecutionException e) {
// retry to offer the task into queue.
@SuppressWarnings("all")
final MemoryLimitedTaskQueue queue = (MemoryLimitedTaskQueue) super.getQueue();
final TaskQueue<Runnable> queue = (TaskQueue<Runnable>) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Queue capacity is full.", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
* TaskQueue.
*/
public interface TaskQueue<E> extends BlockingQueue<E> {

/**
* get executor.
*
* @return the executor
*/
EagerExecutorService getExecutor();

/**
* set the executor.
*
* @param executor executor
*/
void setExecutor(EagerExecutorService executor);

@Override
default boolean offer(final E e) {
if (getExecutor() == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}

int currentPoolThreadSize = getExecutor().getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (getExecutor().getActiveCount() < currentPoolThreadSize) {
return doOffer(e);
}

// return false to let executor create new worker.
if (currentPoolThreadSize < getExecutor().getMaximumPoolSize()) {
return false;
}

// currentPoolThreadSize >= max
return doOffer(e);
}

/**
* offer element to the queue.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else {@code false}
*/
boolean doOffer(E e);

/**
* retry offer task.
*
* @param o task
* @param timeout timeout
* @param unit timeout unit
* @return offer success or not
* @throws java.util.concurrent.RejectedExecutionException if executor is terminated.
* @throws java.lang.InterruptedException if the current thread is interrupted.
*/
default boolean retryOffer(final E o, final long timeout, final TimeUnit unit) throws InterruptedException {
if (getExecutor().isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return offer(o, timeout, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import org.apache.shenyu.common.concurrent.MemoryLimitedTaskQueue;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor;
import org.apache.shenyu.common.concurrent.TaskQueue;
import org.apache.shenyu.common.config.ShenyuConfig;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
Expand All @@ -39,23 +44,42 @@
@Configuration
public class ShenyuThreadPoolConfiguration {

/**
* MemoryLimitedTaskQueue.
*
* @param shenyuConfig the shenyu config
* @return instance of {@link MemoryLimitedTaskQueue}
*/
@Bean
@ConditionalOnMissingBean(TaskQueue.class)
@ConditionalOnProperty("shenyu.sharedPool.maxWorkQueueMemory")
public TaskQueue<Runnable> memoryLimitedTaskQueue(final ShenyuConfig shenyuConfig) {
final Instrumentation instrumentation = ByteBuddyAgent.install();
final ShenyuConfig.SharedPool sharedPool = shenyuConfig.getSharedPool();
final Long maxWorkQueueMemory = sharedPool.getMaxWorkQueueMemory();
if (maxWorkQueueMemory <= 0) {
throw new ShenyuException("${shenyu.sharedPool.maxWorkQueueMemory} must bigger than 0 !");
}
return new MemoryLimitedTaskQueue<>(maxWorkQueueMemory, instrumentation);
}

/**
* crate shenyu shared thread pool executor.
*
* @param shenyuConfig the shenyu config
* @param provider the queue bean provider
* @return the shenyu thread pool executor
*/
@Bean
@ConditionalOnProperty(name = "shenyu.sharedPool.enable", havingValue = "true")
public ShenyuThreadPoolExecutor shenyuThreadPoolExecutor(final ShenyuConfig shenyuConfig) {
final Instrumentation instrumentation = ByteBuddyAgent.install();
public ShenyuThreadPoolExecutor shenyuThreadPoolExecutor(final ShenyuConfig shenyuConfig,
final ObjectProvider<TaskQueue<Runnable>> provider) {
final ShenyuConfig.SharedPool sharedPool = shenyuConfig.getSharedPool();
final Integer corePoolSize = sharedPool.getCorePoolSize();
final Integer maximumPoolSize = sharedPool.getMaximumPoolSize();
final Long keepAliveTime = sharedPool.getKeepAliveTime();
final Long maxWorkQueueMemory = sharedPool.getMaxWorkQueueMemory();
return new ShenyuThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS, new MemoryLimitedTaskQueue<>(maxWorkQueueMemory, instrumentation),
TimeUnit.MILLISECONDS, provider.getIfAvailable(),
ShenyuThreadFactory.create(sharedPool.getPrefix(), true),
new ThreadPoolExecutor.AbortPolicy());
}
Expand All @@ -66,7 +90,7 @@ public ShenyuThreadPoolExecutor shenyuThreadPoolExecutor(final ShenyuConfig shen
* @return the shenyu thread pool executor destructor
*/
@Bean
@ConditionalOnProperty(name = "shenyu.sharedPool.enable", havingValue = "true")
@ConditionalOnBean(ShenyuThreadPoolExecutor.class)
public ShenyuThreadPoolExecutorDestructor shenyuThreadPoolExecutorDestructor() {
return new ShenyuThreadPoolExecutorDestructor();
}
Expand Down

0 comments on commit 86302d4

Please sign in to comment.