Skip to content

Commit

Permalink
Fix fabric8io#3016: Use scheduleWithFixedDelay for resync task execution
Browse files Browse the repository at this point in the history
+ When using `resyncExecutor.scheduleAtFixedRate` we're seeing inconsistent
  resync intervals since it doesn't wait for previous task to be
  terminated. Hence using `resyncExecutor.scheduleWithFixedDelay` instead.
+ Remove unused reflectExecutor from Controller and Reflector since we
  don't use scheduled reflector relist since fabric8io#2964
  • Loading branch information
rohanKanojia committed Apr 30, 2021
1 parent 33db23d commit 51badc4
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Fix #2989: serialization will generate valid yaml when using subtypes
* Fix #2996: Generating CRDs from the API should now properly work
* Fix #3000: Set `no_proxy` in the okhttp builder in case the `proxy_url` is null
* Fix #3016: Use scheduleWithFixedDelay for resync task execution
* Fix #2991: reduced the level of ReflectWatcher event recieved log
* Fix #3027: fix NPE when sorting events in KubernetesResourceUtil
* Fix #2994: updating the SharedIndexInformer indexer state for a delete event generated by resync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private final Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc;

private final ScheduledExecutorService reflectExecutor;

private final ScheduledExecutorService resyncExecutor;

private ScheduledFuture resyncFuture;
Expand All @@ -73,7 +71,7 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private final Class<T> apiTypeClass;

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners, ScheduledExecutorService resyncExecutor) {
this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
Expand All @@ -86,24 +84,19 @@ public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L>
this.operationContext = context;
this.eventListeners = eventListeners;

// Starts one daemon thread for reflector
this.reflectExecutor = Executors.newSingleThreadScheduledExecutor();

// Starts one daemon thread for resync
this.resyncExecutor = Executors.newSingleThreadScheduledExecutor();
this.resyncExecutor = resyncExecutor;
initReflector();
}

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor());
}

public void run() {
log.info("informer#Controller: ready to run resync and reflector runnable");

// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
}
scheduleResync();

try {
reflector.listAndWatch();
Expand All @@ -116,13 +109,22 @@ public void run() {
}
}

void scheduleResync() {
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
}
}

/**
* Stops the resync thread pool first, then stops the reflector.
*/
public void stop() {
synchronized (this) {
reflector.stop();
reflectExecutor.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private final Store<T> store;
private final OperationContext operationContext;
private final long resyncPeriodMillis;
private final ScheduledExecutorService resyncExecutor;
private final ReflectorWatcher<T> watcher;
private final AtomicBoolean isActive;
private final AtomicBoolean isWatcherStarted;
Expand All @@ -57,7 +56,6 @@ public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Store
this.operationContext = operationContext;
this.resyncPeriodMillis = resyncPeriodMillis;
this.lastSyncResourceVersion = new AtomicReference<>();
this.resyncExecutor = resyncExecutor;
this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);
this.isActive = new AtomicBoolean(true);
this.isWatcherStarted = new AtomicBoolean(false);
Expand Down Expand Up @@ -96,9 +94,6 @@ private void reListAndSync() {
log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion);
lastSyncResourceVersion.set(latestResourceVersion);
store.replace(list.getItems(), latestResourceVersion);
if (!isActive.get()) {
resyncExecutor.shutdown();
}
}

private void startWatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
import org.mockito.Mockito;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

class ControllerTest {
private DeltaFIFO<Pod> deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS);
Expand Down Expand Up @@ -69,4 +74,21 @@ void testControllerCreationWithResyncPeriodZero() {
// Then
assertEquals(0L, controller.getReflector().getResyncPeriodMillis());
}

@Test
void testControllerRunSchedulesResyncTaskWithFixedDelay() {
// Given
ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class, Mockito.RETURNS_DEEP_STUBS);
Controller<Pod, PodList> controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher,
simpleEntries -> {
},
() -> true,
1L, operationContext, eventListeners, scheduledExecutorService);

// When
controller.scheduleResync();

// Then
verify(scheduledExecutorService, times(1)).scheduleWithFixedDelay(any(), eq(1L), eq(1L), any());
}
}

0 comments on commit 51badc4

Please sign in to comment.