Skip to content

Commit

Permalink
fix fabric8io#3143 adding a more general listener onException
Browse files Browse the repository at this point in the history
also adding a method to get the api class from the informer
  • Loading branch information
shawkins authored and manusa committed May 24, 2021
1 parent 647378f commit 4e8c130
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 33 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#### Bugs

#### Improvements
* Fix #3135 added mock crud support for patch status, and will return exceptions for unsupported patch types
* Fix #3135: added mock crud support for patch status, and will return exceptions for unsupported patch types
* Fix #3072: various changes to refine how threads are handled by informers. Note that the SharedInformer.run call is now blocking when starting the informer.
* Fix #3143: a new SharedInformerEventListener.onException(SharedIndexInformer, Exception) method is available to determine which informer could not start.

#### Dependency Upgrade

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* <br>Effectively creates a derived single thread executor
*/
public class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Queue<Runnable> tasks = new ArrayDeque<>();
final Executor executor;
Runnable active;

Expand All @@ -35,13 +35,11 @@ public SerialExecutor(Executor executor) {
}

public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
tasks.offer(() -> {
try {
r.run();
} finally {
scheduleNext();
}
});
if (active == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ public interface SharedInformer<T> {
* Return true if the informer is running
*/
boolean isRunning();

/**
* Return the class this informer is watching
*/
Class<T> getApiTypeClass();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@

public interface SharedInformerEventListener {
void onException(Exception exception);

default void onException(SharedIndexInformer<?> informer, Exception e) {
onException(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SharedInformerFactory class constructs and caches informers for api types.
Expand All @@ -48,6 +50,7 @@
* which is ported from offical go client https://github.com/kubernetes/client-go/blob/master/informers/factory.go
*/
public class SharedInformerFactory extends BaseOperation {
private static final Logger log = LoggerFactory.getLogger(SharedInformerFactory.class);
private final Map<String, SharedIndexInformer> informers = new HashMap<>();

private final Map<String, Future> startedInformers = new HashMap<>();
Expand Down Expand Up @@ -231,7 +234,7 @@ private synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>
context = context.withIsNamespaceConfiguredFromGlobalConfig(false);
}
}
SharedIndexInformer<T> informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, eventListeners, informerExecutor, resyncExecutor);
SharedIndexInformer<T> informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor, resyncExecutor);
this.informers.put(getInformerKey(context), informer);
return informer;
}
Expand Down Expand Up @@ -285,8 +288,15 @@ public synchronized void startAllRegisteredInformers() {

if (!informerExecutor.isShutdown()) {
informers.forEach(
(informerType, informer) ->
startedInformers.computeIfAbsent(informerType, key -> informerExecutor.submit(informer::run)));
(informerType, informer) -> startedInformers.computeIfAbsent(informerType,
key -> informerExecutor.submit(() -> {
try {
informer.run();
} catch (RuntimeException e) {
this.eventListeners.forEach(listener -> listener.onException(informer, e));
log.warn("Informer start did not complete", e);
}
})));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.fabric8.kubernetes.client.informers.ResyncRunnable;
import io.fabric8.kubernetes.client.informers.SerialExecutor;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
import io.fabric8.kubernetes.client.informers.SharedScheduler;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand All @@ -34,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand All @@ -58,7 +56,6 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne
// value).
private long defaultEventHandlerResyncPeriod;

private final Collection<SharedInformerEventListener> eventListeners;
private final SharedScheduler resyncExecutor;
private final Reflector<T, L> reflector;
private final Class<T> apiTypeClass;
Expand All @@ -74,13 +71,12 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne

private ScheduledFuture<?> resyncFuture;

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, Collection<SharedInformerEventListener> eventListeners, Executor informerExecutor, SharedScheduler resyncExecutor) {
public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor, SharedScheduler resyncExecutor) {
if (resyncPeriod < 0) {
throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value");
}
this.resyncCheckPeriodMillis = resyncPeriod;
this.defaultEventHandlerResyncPeriod = resyncPeriod;
this.eventListeners = eventListeners;
this.resyncExecutor = resyncExecutor;
this.apiTypeClass = apiTypeClass;

Expand Down Expand Up @@ -136,8 +132,8 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler<T> handler, lon
}

List<T> objectList = this.indexer.list();
for (Object item : objectList) {
listener.add(new ProcessorListener.AddNotification(item));
for (T item : objectList) {
listener.add(new ProcessorListener.AddNotification<>(item));
}
}

Expand All @@ -155,17 +151,11 @@ public void run() {
return;
}

try {
log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis);
log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis);

scheduleResync(processor::shouldResync);

reflector.listSyncAndWatch();
} catch (RuntimeException exception) {
log.warn("Informer start did not complete", exception);
this.eventListeners.forEach(listener -> listener.onException(exception));
throw exception;
}
scheduleResync(processor::shouldResync);

reflector.listSyncAndWatch();
// stop called while run is called could be ineffective, check for it afterwards
synchronized (this) {
if (stopped) {
Expand Down Expand Up @@ -200,7 +190,7 @@ public void addIndexers(Map<String, Function<T, List<String>>> indexers) {
}

@Override
public Indexer getIndexer() {
public Indexer<T> getIndexer() {
return this.indexer;
}

Expand Down Expand Up @@ -237,5 +227,10 @@ public long getFullResyncPeriod() {
ScheduledFuture<?> getResyncFuture() {
return resyncFuture;
}

@Override
public Class<T> getApiTypeClass() {
return apiTypeClass;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import static org.awaitility.Awaitility.await;

public class SharedSchedulerTest {
class SharedSchedulerTest {

@Test
void testAutoShutdown() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,7 +43,7 @@ private abstract static class AbstractPodListerWatcher implements ListerWatcher<
DefaultSharedIndexInformer<Pod, PodList> defaultSharedIndexInformer;

private DefaultSharedIndexInformer<Pod, PodList> createDefaultSharedIndexInformer(long resyncPeriod) {
defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Collections.emptyList(), Runnable::run, sharedScheduler);
defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Runnable::run, sharedScheduler);
return defaultSharedIndexInformer;
}

Expand Down

0 comments on commit 4e8c130

Please sign in to comment.