From 751fc28749c314f5978dd1696edc015d96b55605 Mon Sep 17 00:00:00 2001 From: shawkins Date: Mon, 24 May 2021 08:27:03 -0400 Subject: [PATCH] further consolidating threading concerns moved some logic into util, creating a common scheduler and thread pool --- .../client/DefaultKubernetesClient.java | 4 +- .../dsl/internal/AbstractWatchManager.java | 57 +++++++------- .../client/dsl/internal/NamedRunnable.java | 46 ------------ .../client/dsl/internal/WatchHTTPManager.java | 46 ++---------- .../internal/WatcherWebSocketListener.java | 47 +++--------- .../client/informers/ResyncRunnable.java | 10 +-- .../informers/SharedInformerFactory.java | 15 +++- .../impl/DefaultSharedIndexInformer.java | 23 +++--- .../CachedSingleThreadScheduler.java} | 26 +++++-- .../{informers => utils}/SerialExecutor.java | 4 +- .../kubernetes/client/utils/Utils.java | 36 ++++++++- .../internal/AbstractWatchManagerTest.java | 75 ------------------- .../DefaultSharedIndexInformerResyncTest.java | 4 +- .../CachedSingleThreadSchedulerTest.java} | 9 +-- 14 files changed, 136 insertions(+), 266 deletions(-) delete mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java rename kubernetes-client/src/main/java/io/fabric8/kubernetes/client/{informers/SharedScheduler.java => utils/CachedSingleThreadScheduler.java} (80%) rename kubernetes-client/src/main/java/io/fabric8/kubernetes/client/{informers => utils}/SerialExecutor.java (96%) rename kubernetes-client/src/test/java/io/fabric8/kubernetes/client/{informers/SharedSchedulerTest.java => utils/CachedSingleThreadSchedulerTest.java} (90%) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java index a807bee5931..6b6016d9a1a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java @@ -122,13 +122,11 @@ import io.fabric8.kubernetes.client.extended.run.RunOperations; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; import io.fabric8.kubernetes.client.utils.Serialization; -import io.fabric8.kubernetes.client.utils.Utils; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import okhttp3.OkHttpClient; @@ -577,7 +575,7 @@ public AutoscalingAPIGroupDSL autoscaling() { */ @Override public SharedInformerFactory informers() { - return new SharedInformerFactory(Executors.newCachedThreadPool(Utils.daemonThreadFactory(this)), httpClient, getConfiguration()); + return new SharedInformerFactory(httpClient, getConfiguration()); } /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 0ebf076dafd..e54f54a9689 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -16,7 +16,6 @@ package io.fabric8.kubernetes.client.dsl.internal; import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; @@ -27,8 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -46,11 +44,12 @@ public abstract class AbstractWatchManager implements Watch { private final int reconnectInterval; private final int maxIntervalExponent; final AtomicInteger currentReconnectAttempt; - private final ScheduledExecutorService executorService; + private ScheduledFuture reconnectAttempt; private final RequestBuilder requestBuilder; protected ClientRunner runner; + private final AtomicBoolean reconnectPending = new AtomicBoolean(false); AbstractWatchManager( Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder @@ -62,7 +61,6 @@ public abstract class AbstractWatchManager implements Watch { this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); - this.executorService = Executors.newSingleThreadScheduledExecutor(Utils.daemonThreadFactory(AbstractWatchManager.this)); this.requestBuilder = requestBuilder; } @@ -90,30 +88,35 @@ final void closeEvent() { watcher.onClose(); } - final void closeExecutorService() { - if (executorService != null && !executorService.isShutdown()) { - logger.debug("Closing ExecutorService"); - try { - executorService.shutdown(); - if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { - logger.warn("Executor didn't terminate in time after shutdown in close(), killing it."); - executorService.shutdownNow(); - } - } catch (Exception t) { - throw KubernetesClientException.launderThrowable(t); - } + final synchronized void cancelReconnect() { + if (reconnectAttempt != null) { + reconnectAttempt.cancel(true); } } - - void submit(Runnable task) { - if (!executorService.isShutdown()) { - executorService.submit(task); + + void scheduleReconnect(Runnable command, boolean shouldBackoff) { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; } - } - - void schedule(Runnable command, long delay, TimeUnit timeUnit) { - if (!executorService.isShutdown()) { - executorService.schedule(command, delay, timeUnit); + + logger.debug("Scheduling reconnect task"); + + long delay = shouldBackoff + ? nextReconnectInterval() + : 0; + + synchronized (this) { + reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> { + try { + command.run(); + } finally { + reconnectPending.set(false); + } + }, delay, TimeUnit.MILLISECONDS); + if (forceClosed.get()) { + cancelReconnect(); + } } } @@ -179,7 +182,7 @@ public void close() { logger.debug("Force closing the watch {}", this); closeEvent(); runner.close(); - closeExecutorService(); + cancelReconnect(); } @FunctionalInterface diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java deleted file mode 100644 index 3928cde14d1..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.internal; - -import java.util.Objects; - -abstract class NamedRunnable implements Runnable { - private final String name; - - NamedRunnable(String name) { - this.name = Objects.requireNonNull(name); - } - - private void tryToSetName(String value) { - try { - Thread.currentThread().setName(value); - } catch (SecurityException ignored) { - // Ignored - } - } - - public final void run() { - String oldName = Thread.currentThread().getName(); - tryToSetName(this.name + "|" + oldName); - try { - execute(); - } finally { - tryToSetName(oldName); - } - } - - protected abstract void execute(); -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 0ab76a81497..a21bb6e4858 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -18,9 +18,7 @@ import java.io.IOException; import java.net.MalformedURLException; import java.util.List; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResource; @@ -96,7 +94,6 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) { private abstract static class HTTPClientRunner extends AbstractWatchManager.ClientRunner { private final AbstractWatchManager manager; - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); public HTTPClientRunner(OkHttpClient client, AbstractWatchManager manager) { super(client); @@ -155,43 +152,16 @@ private void scheduleReconnect(boolean shouldBackoff) { return; } - logger.debug("Submitting reconnect task to the executor"); - - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor. - manager.submit(() -> { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } + manager.scheduleReconnect(() -> { try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - - long delay = shouldBackoff - ? manager.nextReconnectInterval() - : 0; - - manager.schedule(() -> { - try { - manager.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - close(); - manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); - } - }, delay, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - // This is a standard exception if we close the scheduler. We should not print it - if (!manager.isForceClosed()) { - logger.error("Exception in reconnect", e); - } - reconnectPending.set(false); + manager.runWatch(); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + close(); + manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); } - }); + }, shouldBackoff); } public void onMessage(String messageSource) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index b42dccab410..617cdb1d9ea 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -16,8 +16,6 @@ package io.fabric8.kubernetes.client.dsl.internal; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -43,7 +41,7 @@ abstract class WatcherWebSocketListener extends WebSocketListener { * True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); + /** * Blocking queue for startup exceptions. */ @@ -152,40 +150,17 @@ public void onClosed(WebSocket webSocket, int code, String reason) { } private void scheduleReconnect() { - logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - manager.submit(new NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } + webSocketRef.set(null); + manager.scheduleReconnect(() -> { + try { + manager.runWatch(); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - manager.schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - manager.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); - manager.close(); - } - } - }, manager.nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } + manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); + manager.close(); } - }); + }, true); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java index d4c78da48c9..bd0730862b9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java @@ -19,7 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.BooleanSupplier; +import java.util.function.Supplier; /** * Calls the resync function of store interface which is always implemented @@ -30,9 +30,9 @@ public class ResyncRunnable implements Runnable { private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class); private Store store; - private BooleanSupplier shouldResyncFunc; + private Supplier shouldResyncFunc; - public ResyncRunnable(Store store, BooleanSupplier shouldResyncFunc) { + public ResyncRunnable(Store store, Supplier shouldResyncFunc) { this.store = store; this.shouldResyncFunc = shouldResyncFunc; } @@ -42,9 +42,7 @@ public void run() { log.debug("ResyncRunnable#resync .. .."); } - // if there is a concern that this processing could overwhelm the single - // thread, then hand this off to the common pool - if (shouldResyncFunc == null || shouldResyncFunc.getAsBoolean()) { + if (shouldResyncFunc == null || shouldResyncFunc.get()) { if (log.isDebugEnabled()) { log.debug("ResyncRunnable#force resync"); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java index dc37d1cf7c1..f2bf14ca1d6 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java @@ -56,12 +56,21 @@ public class SharedInformerFactory extends BaseOperation { private final Map startedInformers = new HashMap<>(); private final ExecutorService informerExecutor; - private final SharedScheduler resyncExecutor = new SharedScheduler(); private final BaseOperation baseOperation; private final ConcurrentLinkedQueue eventListeners = new ConcurrentLinkedQueue<>(); + private boolean allowShutdown = true; + + public SharedInformerFactory(OkHttpClient okHttpClient, Config configuration) { + // ideally this should be bounded. The current implication is that there + // can be 1 thread used (not dedicated to) per informer - which + // could be problematic for a large number of informers. however + // there is already a superceding issue there of thread utilization by okhttp + this(Utils.getCommonExecutorSerive(), okHttpClient, configuration); + this.allowShutdown = false; + } /** * Constructor with thread pool specified. * @@ -234,7 +243,7 @@ private synchronized context = context.withIsNamespaceConfiguredFromGlobalConfig(false); } } - SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor, resyncExecutor); + SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor); this.informers.put(getInformerKey(context), informer); return informer; } @@ -322,7 +331,7 @@ public synchronized void stopAllRegisteredInformers(boolean shutDownThreadPool) informer.stop(); } }); - if (shutDownThreadPool) { + if (shutDownThreadPool && allowShutdown) { informerExecutor.shutdown(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 2ce034e32ff..e44bf825633 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -21,15 +21,15 @@ import io.fabric8.kubernetes.client.informers.ListerWatcher; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.ResyncRunnable; -import io.fabric8.kubernetes.client.informers.SerialExecutor; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.SharedScheduler; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Indexer; import io.fabric8.kubernetes.client.informers.cache.ProcessorListener; import io.fabric8.kubernetes.client.informers.cache.ProcessorStore; import io.fabric8.kubernetes.client.informers.cache.Reflector; import io.fabric8.kubernetes.client.informers.cache.SharedProcessor; +import io.fabric8.kubernetes.client.utils.SerialExecutor; +import io.fabric8.kubernetes.client.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +39,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BooleanSupplier; import java.util.function.Function; +import java.util.function.Supplier; public class DefaultSharedIndexInformer> implements SharedIndexInformer { private static final Logger log = LoggerFactory.getLogger(DefaultSharedIndexInformer.class); @@ -56,30 +56,27 @@ public class DefaultSharedIndexInformer reflector; private final Class apiTypeClass; - private final ProcessorStore processorStore; - - private Cache indexer; - - private SharedProcessor processor; + private final Cache indexer; + private final SharedProcessor processor; + private final Executor informerExecutor; private AtomicBoolean started = new AtomicBoolean(); private volatile boolean stopped = false; private ScheduledFuture resyncFuture; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor, SharedScheduler resyncExecutor) { + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor) { if (resyncPeriod < 0) { throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); } this.resyncCheckPeriodMillis = resyncPeriod; this.defaultEventHandlerResyncPeriod = resyncPeriod; - this.resyncExecutor = resyncExecutor; this.apiTypeClass = apiTypeClass; + this.informerExecutor = informerExecutor; // reuse the informer executor, but ensure serial processing this.processor = new SharedProcessor<>(new SerialExecutor(informerExecutor)); this.indexer = new Cache<>(); @@ -210,11 +207,11 @@ public boolean isRunning() { return !stopped && started.get() && reflector.isRunning(); } - synchronized void scheduleResync(BooleanSupplier resyncFunc) { + synchronized void scheduleResync(Supplier resyncFunc) { // schedule the resync runnable if (resyncCheckPeriodMillis > 0) { ResyncRunnable resyncRunnable = new ResyncRunnable<>(processorStore, resyncFunc); - resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS); + resyncFuture = Utils.scheduleAtFixedRate(informerExecutor, resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS); } else { log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadScheduler.java similarity index 80% rename from kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java rename to kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadScheduler.java index b5a3a786872..c813f76fef2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadScheduler.java @@ -14,34 +14,33 @@ * limitations under the License. */ -package io.fabric8.kubernetes.client.informers; - -import io.fabric8.kubernetes.client.utils.Utils; +package io.fabric8.kubernetes.client.utils; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** - * Maintains a single thread daemon scheduler. + * Maintains a single thread daemon scheduler, which will terminate the thread + * when not in use. * *
It is not intended for long-running tasks, * but it does not assume the task can be handed off to the common pool * *
This is very similar to the CompletableFuture.Delayer, but provides a scheduler method */ -public class SharedScheduler { +public class CachedSingleThreadScheduler { public static final long DEFAULT_TTL_MILLIS = TimeUnit.SECONDS.toMillis(10); private final long ttlMillis; private ScheduledThreadPoolExecutor executor; - public SharedScheduler() { + public CachedSingleThreadScheduler() { this(DEFAULT_TTL_MILLIS); } - public SharedScheduler(long ttlMillis) { + public CachedSingleThreadScheduler(long ttlMillis) { this.ttlMillis = ttlMillis; } @@ -49,13 +48,24 @@ public synchronized ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + startExecutor(); + return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + public synchronized ScheduledFuture schedule(Runnable command, + long delay, + TimeUnit unit) { + startExecutor(); + return executor.schedule(command, delay, unit); + } + + private void startExecutor() { if (executor == null) { // start the executor and add a ttl task executor = new ScheduledThreadPoolExecutor(1, Utils.daemonThreadFactory(this)); executor.setRemoveOnCancelPolicy(true); executor.scheduleWithFixedDelay(this::shutdownCheck, ttlMillis, ttlMillis, TimeUnit.MILLISECONDS); } - return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); } /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/SerialExecutor.java similarity index 96% rename from kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java rename to kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/SerialExecutor.java index 3aee8c27a2e..a167361f64f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/SerialExecutor.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.fabric8.kubernetes.client.informers; +package io.fabric8.kubernetes.client.utils; import java.util.ArrayDeque; import java.util.Queue; @@ -52,4 +52,4 @@ protected synchronized void scheduleNext() { executor.execute(active); } } -} \ No newline at end of file +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java index 87db484be71..26fb4d556c7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java @@ -41,8 +41,10 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -59,6 +61,9 @@ public class Utils { public static final String PATH_WINDOWS = "Path"; public static final String PATH_UNIX = "PATH"; private static final Random random = new Random(); + + private static final ExecutorService SHARED_POOL = Executors.newCachedThreadPool(); + private static final CachedSingleThreadScheduler SHARED_SCHEDULER = new CachedSingleThreadScheduler(); private Utils() { } @@ -458,17 +463,46 @@ private static String getOperatingSystemFromSystemProperty() { * name based upon the object passed in. */ public static ThreadFactory daemonThreadFactory(Object forObject) { + String name = forObject.getClass().getSimpleName() + "-" + System.identityHashCode(forObject); + return daemonThreadFactory(name); + } + + static ThreadFactory daemonThreadFactory(String name) { return new ThreadFactory() { ThreadFactory threadFactory = Executors.defaultThreadFactory(); @Override public Thread newThread(Runnable r) { Thread ret = threadFactory.newThread(r); - ret.setName(forObject.getClass().getSimpleName() + "-" + System.identityHashCode(forObject) + "-" + ret.getName()); + ret.setName(name + "-" + ret.getName()); ret.setDaemon(true); return ret; } }; } + + /** + * Schedule a task to run in the given {@link Executor} - which should run the task in a different thread as to not + * hold the scheduling thread + */ + public static ScheduledFuture schedule(Executor executor, Runnable command, long delay, TimeUnit unit) { + return SHARED_SCHEDULER.schedule(() -> executor.execute(command), delay, unit); + } + /** + * Schedule a repeated task to run in the given {@link Executor} - which should run the task in a different thread as to not + * hold the scheduling thread + */ + public static ScheduledFuture scheduleAtFixedRate(Executor executor, Runnable command, long initialDelay, long delay, TimeUnit unit) { + // because of the hand-off to the other executor, there's no difference between rate and delay + return SHARED_SCHEDULER.scheduleWithFixedDelay(() -> executor.execute(command), initialDelay, delay, unit); + } + + /** + * Get the common executor service - callers should not shutdown this service + */ + public static ExecutorService getCommonExecutorSerive() { + return SHARED_POOL; + } + } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index a1a482b2ab8..ecd15b2a5fa 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -29,18 +29,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; class AbstractWatchManagerTest { @@ -98,78 +95,6 @@ void closeWebSocket() { verify(webSocket, times(1)).close(1000, null); } - @Test - @DisplayName("closeExecutorService, with graceful termination") - void closeExecutorServiceGracefully() throws InterruptedException{ - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.awaitTermination(1, TimeUnit.SECONDS)).thenReturn(true); - // When - awm.closeExecutorService(); - // Then - verify(executorService, times(1)).shutdown(); - verify(executorService, times(0)).shutdownNow(); - } - - @Test - @DisplayName("closeExecutorService, with shutdownNow") - void closeExecutorServiceNow() throws InterruptedException { - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.awaitTermination(1, TimeUnit.SECONDS)).thenReturn(false); - // When - awm.closeExecutorService(); - // Then - verify(executorService, times(1)).shutdown(); - verify(executorService, times(1)).shutdownNow(); - } - - @Test - @DisplayName("submit, executor not shutdown, should submit") - void submitWhenIsNotShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - // When - awm.submit(() -> {}); - // Then - verify(executorService, times(1)).submit(any(Runnable.class)); - } - - @Test - @DisplayName("submit, executor shutdown, should NOT submit") - void submitWhenIsShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.isShutdown()).thenReturn(true); - // When - awm.submit(() -> {}); - // Then - verify(executorService, times(0)).submit(any(Runnable.class)); - } - - @Test - @DisplayName("schedule, executor not shutdown, should submit") - void scheduleWhenIsNotShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - // When - awm.schedule(() -> {}, 0, TimeUnit.SECONDS); - // Then - verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any()); - } - - @Test - @DisplayName("schedule, executor shutdown, should NOT submit") - void scheduleWhenIsShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.isShutdown()).thenReturn(true); - // When - awm.schedule(() -> {}, 0, TimeUnit.SECONDS); - // Then - verify(executorService, times(0)).schedule(any(Runnable.class), anyLong(), any()); - } - @Test @DisplayName("nextReconnectInterval, returns exponential interval values up to the provided limit") void nextReconnectInterval() { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java index ee1b0875805..fa438e99eb1 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -19,7 +19,6 @@ import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ListerWatcher; -import io.fabric8.kubernetes.client.informers.SharedScheduler; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -39,11 +38,10 @@ private abstract static class AbstractPodListerWatcher implements ListerWatcher< private static final Long WAIT_TIME = 500L; private final ListerWatcher listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS); private final OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS); - private SharedScheduler sharedScheduler = new SharedScheduler(); DefaultSharedIndexInformer defaultSharedIndexInformer; private DefaultSharedIndexInformer createDefaultSharedIndexInformer(long resyncPeriod) { - defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Runnable::run, sharedScheduler); + defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Runnable::run); return defaultSharedIndexInformer; } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadSchedulerTest.java similarity index 90% rename from kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java rename to kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadSchedulerTest.java index 4cd419c1179..7915b3b0e73 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadSchedulerTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.fabric8.kubernetes.client.informers; +package io.fabric8.kubernetes.client.utils; import org.junit.jupiter.api.Test; @@ -22,16 +22,15 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.awaitility.Awaitility.await; - -class SharedSchedulerTest { +class CachedSingleThreadSchedulerTest { @Test void testAutoShutdown() throws InterruptedException { - SharedScheduler scheduler = new SharedScheduler(50); + CachedSingleThreadScheduler scheduler = new CachedSingleThreadScheduler(50); assertFalse(scheduler.hasExecutor()); CountDownLatch latch = new CountDownLatch(2); ScheduledFuture future = scheduler.scheduleWithFixedDelay(latch::countDown, 50, 50, TimeUnit.MILLISECONDS);