Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not leak running pools from the internal collection #3885

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -74,7 +73,7 @@ public class ThreadPoolManager {
protected static final long THREAD_TIMEOUT = 65L;
protected static final long THREAD_MONITOR_SLEEP = 60000;

protected static Map<String, ExecutorService> pools = new WeakHashMap<>();
protected static Map<String, ExecutorService> pools = new ConcurrentHashMap<>();

private static Map<String, Integer> configs = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -124,23 +123,17 @@ protected void modified(Map<String, Object> properties) {
* @return an instance to use
*/
public static ScheduledExecutorService getScheduledPool(String poolName) {
ExecutorService pool = pools.get(poolName);
if (pool == null) {
synchronized (pools) {
// do a double check if it is still null or if another thread might have created it meanwhile
pool = pools.get(poolName);
if (pool == null) {
int cfg = getConfig(poolName);
pool = new WrappedScheduledExecutorService(cfg,
new NamedThreadFactory(poolName, true, Thread.NORM_PRIORITY));
((ThreadPoolExecutor) pool).setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
((ScheduledThreadPoolExecutor) pool).setRemoveOnCancelPolicy(true);
pools.put(poolName, pool);
LOGGER.debug("Created scheduled thread pool '{}' of size {}", poolName, cfg);
}
}
}
ExecutorService pool = pools.computeIfAbsent(poolName, (name) -> {
int cfg = getConfig(name);
ScheduledThreadPoolExecutor executor = new WrappedScheduledExecutorService(cfg,
new NamedThreadFactory(name, true, Thread.NORM_PRIORITY));
executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
executor.setRemoveOnCancelPolicy(true);
LOGGER.debug("Created scheduled thread pool '{}' of size {}", name, cfg);
return executor;
});

if (pool instanceof ScheduledExecutorService service) {
return new UnstoppableScheduledExecutorService(poolName, service);
} else {
Expand All @@ -156,21 +149,15 @@ public static ScheduledExecutorService getScheduledPool(String poolName) {
* @return an instance to use
*/
public static ExecutorService getPool(String poolName) {
ExecutorService pool = pools.get(poolName);
if (pool == null) {
synchronized (pools) {
// do a double check if it is still null or if another thread might have created it meanwhile
pool = pools.get(poolName);
if (pool == null) {
int cfg = getConfig(poolName);
pool = QueueingThreadPoolExecutor.createInstance(poolName, cfg);
((ThreadPoolExecutor) pool).setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
pools.put(poolName, pool);
LOGGER.debug("Created thread pool '{}' with size {}", poolName, cfg);
}
}
}
ExecutorService pool = pools.computeIfAbsent(poolName, (name) -> {
int cfg = getConfig(name);
ThreadPoolExecutor executor = QueueingThreadPoolExecutor.createInstance(name, cfg);
executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
LOGGER.debug("Created thread pool '{}' with size {}", name, cfg);
return executor;
});

return new UnstoppableExecutorService<>(poolName, pool);
}

Expand All @@ -179,9 +166,9 @@ static ThreadPoolExecutor getPoolUnwrapped(String poolName) {
return (ThreadPoolExecutor) ret.getDelegate();
}

static ThreadPoolExecutor getScheduledPoolUnwrapped(String poolName) {
static ScheduledThreadPoolExecutor getScheduledPoolUnwrapped(String poolName) {
UnstoppableExecutorService<?> ret = (UnstoppableScheduledExecutorService) getScheduledPool(poolName);
return (ThreadPoolExecutor) ret.getDelegate();
return (ScheduledThreadPoolExecutor) ret.getDelegate();
}

protected static int getConfig(String poolName) {
Expand Down