Skip to content

Commit

Permalink
further consolidating threading concerns
Browse files Browse the repository at this point in the history
moved some logic into util, creating a common scheduler and thread pool
  • Loading branch information
shawkins authored and manusa committed May 24, 2021
1 parent 4e8c130 commit 751fc28
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,11 @@
import io.fabric8.kubernetes.client.extended.run.RunOperations;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import okhttp3.OkHttpClient;

Expand Down Expand Up @@ -577,7 +575,7 @@ public AutoscalingAPIGroupDSL autoscaling() {
*/
@Override
public SharedInformerFactory informers() {
return new SharedInformerFactory(Executors.newCachedThreadPool(Utils.daemonThreadFactory(this)), httpClient, getConfiguration());
return new SharedInformerFactory(httpClient, getConfiguration());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
Expand All @@ -27,8 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -46,11 +44,12 @@ public abstract class AbstractWatchManager<T> implements Watch {
private final int reconnectInterval;
private final int maxIntervalExponent;
final AtomicInteger currentReconnectAttempt;
private final ScheduledExecutorService executorService;
private ScheduledFuture<?> reconnectAttempt;

private final RequestBuilder requestBuilder;
protected ClientRunner runner;

private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

AbstractWatchManager(
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder
Expand All @@ -62,7 +61,6 @@ public abstract class AbstractWatchManager<T> implements Watch {
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.executorService = Executors.newSingleThreadScheduledExecutor(Utils.daemonThreadFactory(AbstractWatchManager.this));

this.requestBuilder = requestBuilder;
}
Expand Down Expand Up @@ -90,30 +88,35 @@ final void closeEvent() {
watcher.onClose();
}

final void closeExecutorService() {
if (executorService != null && !executorService.isShutdown()) {
logger.debug("Closing ExecutorService");
try {
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
logger.warn("Executor didn't terminate in time after shutdown in close(), killing it.");
executorService.shutdownNow();
}
} catch (Exception t) {
throw KubernetesClientException.launderThrowable(t);
}
final synchronized void cancelReconnect() {
if (reconnectAttempt != null) {
reconnectAttempt.cancel(true);
}
}

void submit(Runnable task) {
if (!executorService.isShutdown()) {
executorService.submit(task);

void scheduleReconnect(Runnable command, boolean shouldBackoff) {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
}

void schedule(Runnable command, long delay, TimeUnit timeUnit) {
if (!executorService.isShutdown()) {
executorService.schedule(command, delay, timeUnit);

logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? nextReconnectInterval()
: 0;

synchronized (this) {
reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> {
try {
command.run();
} finally {
reconnectPending.set(false);
}
}, delay, TimeUnit.MILLISECONDS);
if (forceClosed.get()) {
cancelReconnect();
}
}
}

Expand Down Expand Up @@ -179,7 +182,7 @@ public void close() {
logger.debug("Force closing the watch {}", this);
closeEvent();
runner.close();
closeExecutorService();
cancelReconnect();
}

@FunctionalInterface
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
Expand Down Expand Up @@ -96,7 +94,6 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) {

private abstract static class HTTPClientRunner<T extends HasMetadata> extends AbstractWatchManager.ClientRunner {
private final AbstractWatchManager<T> manager;
private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

public HTTPClientRunner(OkHttpClient client, AbstractWatchManager<T> manager) {
super(client);
Expand Down Expand Up @@ -155,43 +152,16 @@ private void scheduleReconnect(boolean shouldBackoff) {
return;
}

logger.debug("Submitting reconnect task to the executor");

// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
manager.submit(() -> {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
manager.scheduleReconnect(() -> {
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? manager.nextReconnectInterval()
: 0;

manager.schedule(() -> {
try {
manager.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
}
}, delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// This is a standard exception if we close the scheduler. We should not print it
if (!manager.isForceClosed()) {
logger.error("Exception in reconnect", e);
}
reconnectPending.set(false);
manager.runWatch();
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
}
});
}, shouldBackoff);
}

public void onMessage(String messageSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.fabric8.kubernetes.client.dsl.internal;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -43,7 +41,7 @@ abstract class WatcherWebSocketListener<T> extends WebSocketListener {
* True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started.
*/
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

/**
* Blocking queue for startup exceptions.
*/
Expand Down Expand Up @@ -152,40 +150,17 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
}

private void scheduleReconnect() {
logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor
manager.submit(new NamedRunnable("scheduleReconnect") {
@Override
public void execute() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
webSocketRef.set(null);
manager.scheduleReconnect(() -> {
try {
manager.runWatch();
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
manager.schedule(new NamedRunnable("reconnectAttempt") {
@Override
public void execute() {
try {
manager.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e));
manager.close();
}
}
}, manager.nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
reconnectPending.set(false);
}
manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e));
manager.close();
}
});
}, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

/**
* Calls the resync function of store interface which is always implemented
Expand All @@ -30,9 +30,9 @@ public class ResyncRunnable<T> implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class);

private Store<T> store;
private BooleanSupplier shouldResyncFunc;
private Supplier<Boolean> shouldResyncFunc;

public ResyncRunnable(Store<T> store, BooleanSupplier shouldResyncFunc) {
public ResyncRunnable(Store<T> store, Supplier<Boolean> shouldResyncFunc) {
this.store = store;
this.shouldResyncFunc = shouldResyncFunc;
}
Expand All @@ -42,9 +42,7 @@ public void run() {
log.debug("ResyncRunnable#resync .. ..");
}

// if there is a concern that this processing could overwhelm the single
// thread, then hand this off to the common pool
if (shouldResyncFunc == null || shouldResyncFunc.getAsBoolean()) {
if (shouldResyncFunc == null || shouldResyncFunc.get()) {
if (log.isDebugEnabled()) {
log.debug("ResyncRunnable#force resync");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,21 @@ public class SharedInformerFactory extends BaseOperation {
private final Map<String, Future> startedInformers = new HashMap<>();

private final ExecutorService informerExecutor;
private final SharedScheduler resyncExecutor = new SharedScheduler();

private final BaseOperation baseOperation;

private final ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners = new ConcurrentLinkedQueue<>();

private boolean allowShutdown = true;

public SharedInformerFactory(OkHttpClient okHttpClient, Config configuration) {
// ideally this should be bounded. The current implication is that there
// can be 1 thread used (not dedicated to) per informer - which
// could be problematic for a large number of informers. however
// there is already a superceding issue there of thread utilization by okhttp
this(Utils.getCommonExecutorSerive(), okHttpClient, configuration);
this.allowShutdown = false;
}
/**
* Constructor with thread pool specified.
*
Expand Down Expand Up @@ -234,7 +243,7 @@ private synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>
context = context.withIsNamespaceConfiguredFromGlobalConfig(false);
}
}
SharedIndexInformer<T> informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor, resyncExecutor);
SharedIndexInformer<T> informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor);
this.informers.put(getInformerKey(context), informer);
return informer;
}
Expand Down Expand Up @@ -322,7 +331,7 @@ public synchronized void stopAllRegisteredInformers(boolean shutDownThreadPool)
informer.stop();
}
});
if (shutDownThreadPool) {
if (shutDownThreadPool && allowShutdown) {
informerExecutor.shutdown();
}
}
Expand Down
Loading

0 comments on commit 751fc28

Please sign in to comment.