Skip to content

Commit

Permalink
refactor: Removing the deltafifo to simplify the informer code (fabri…
Browse files Browse the repository at this point in the history
…c8io#3061)

* removing the deltafifo to simplify the informer code

* better reuse of the actual store replace function

this ensures that the cache state is fully updated by the time any of
the notifications are processed
  • Loading branch information
shawkins authored and ddl-audi committed May 14, 2021
1 parent b947304 commit b0c06a5
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 818 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* Fix #3015: Thread interruption in a nominal case (like closing the client) are now logged in debug
* Fix #3057: Removed debug calls for CustomResource during deserialization
* Fix #3050: More enforcement of the informer lifecycle
* Fix #3061: Removed the deltafifo from the informer logic

#### Dependency Upgrade
* Fix #2979: Update Kubernetes Model to v1.21.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,6 @@ public void addIndexFunc(String indexName, Function<T, List<String>> indexFunc)
* @return the key
*/
public static <T> String deletionHandlingMetaNamespaceKeyFunc(T object) {
if (object instanceof DeltaFIFO.DeletedFinalStateUnknown) {
DeltaFIFO.DeletedFinalStateUnknown deleteObj = (DeltaFIFO.DeletedFinalStateUnknown) object;
return deleteObj.getKey();
}
return metaNamespaceKeyFunc(object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractMap;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
Expand All @@ -48,19 +45,14 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<
*/
private final long fullResyncPeriod;

/**
* Queue stores deltas produced by reflector.
*/
private final DeltaFIFO<T> queue;
private final Store<T> store;

private final ListerWatcher<T, L> listerWatcher;

private Reflector<T, L> reflector;

private final Supplier<Boolean> resyncFunc;

private final Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc;

private final ScheduledExecutorService resyncExecutor;

private ScheduledFuture resyncFuture;
Expand All @@ -71,13 +63,10 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private final Class<T> apiTypeClass;

private volatile boolean running;

Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners, ScheduledExecutorService resyncExecutor) {
this.queue = queue;
Controller(Class<T> apiTypeClass, Store<T> store, ListerWatcher<T, L> listerWatcher, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners, ScheduledExecutorService resyncExecutor) {
this.store = store;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
this.processFunc = processFunc;
this.resyncFunc = resyncFunc;
if (fullResyncPeriod < 0) {
throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value");
Expand All @@ -86,39 +75,32 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<
this.operationContext = context;
this.eventListeners = eventListeners;

// Starts one daemon thread for resync
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext);
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, store, operationContext);
this.resyncExecutor = resyncExecutor;
}

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor());
public Controller(Class<T> apiTypeClass, Store<T> store, ListerWatcher<T, L> listerWatcher, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this(apiTypeClass, store, listerWatcher, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor());
}

public void run() {
log.info("informer#Controller: ready to run resync and reflector runnable");

scheduleResync();

try {
running = true;
log.info("Started Reflector watch for {}", apiTypeClass);
reflector.listSyncAndWatch();

// Start the process loop
this.processLoop();
} catch (Exception exception) {
log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", exception);
this.eventListeners.forEach(listener -> listener.onException(exception));
} finally {
running = false;
}
}

void scheduleResync() {
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
ResyncRunnable resyncRunnable = new ResyncRunnable(store, resyncFunc);
resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
Expand All @@ -143,7 +125,7 @@ public void stop() {
* @return boolean value about queue sync status
*/
public boolean hasSynced() {
return this.queue.hasSynced();
return this.store.hasSynced();
}

/**
Expand All @@ -158,30 +140,12 @@ Reflector<T, L> getReflector() {
return reflector;
}

/**
* drains the work queue.
*/
private void processLoop() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
this.queue.pop(this.processFunc);
} catch (InterruptedException t) {
log.debug("DefaultController#processLoop got interrupted: {}", t.getMessage());
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e);
throw e;
}
}
}

ScheduledExecutorService getResyncExecutor() {
return this.resyncExecutor;
}

public boolean isRunning() {
return running && this.reflector.isRunning();
return this.reflector.isRunning();
}

public long getFullResyncPeriod() {
Expand Down
Loading

0 comments on commit b0c06a5

Please sign in to comment.