Skip to content

Commit

Permalink
Fix #3016: Use scheduleWithFixedDelay for resync task execution
Browse files Browse the repository at this point in the history
+ When using `resyncExecutor.scheduleAtFixedRate` we're seeing inconsistent
  resync intervals since it doesn't wait for previous task to be
  terminated. Hence using `resyncExecutor.scheduleWithFixedDelay` instead.
+ Remove unused reflectExecutor from Controller and Reflector since we
  don't use scheduled reflector relist since #2964
  • Loading branch information
rohanKanojia committed Apr 22, 2021
1 parent 33db23d commit b106bfb
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fix #3027: fix NPE when sorting events in KubernetesResourceUtil
* Fix #2994: updating the SharedIndexInformer indexer state for a delete event generated by resync
* Fix #2992: allowing Watch auto-reconnect for shared informers
* Fix #3016: Use scheduleWithFixedDelay for resync task execution

#### Improvements
* Fix #2910: Move crd-generator tests from kubernetes-itests to kubernetes-tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private final Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc;

private final ScheduledExecutorService reflectExecutor;

private final ScheduledExecutorService resyncExecutor;

private ScheduledFuture resyncFuture;
Expand All @@ -73,7 +71,7 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private final Class<T> apiTypeClass;

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners, ScheduledExecutorService resyncExecutor) {
this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
Expand All @@ -86,24 +84,19 @@ public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L>
this.operationContext = context;
this.eventListeners = eventListeners;

// Starts one daemon thread for reflector
this.reflectExecutor = Executors.newSingleThreadScheduledExecutor();

// Starts one daemon thread for resync
this.resyncExecutor = Executors.newSingleThreadScheduledExecutor();
this.resyncExecutor = resyncExecutor;
initReflector();
}

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor());
}

public void run() {
log.info("informer#Controller: ready to run resync and reflector runnable");

// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
}
scheduleResync();

try {
reflector.listAndWatch();
Expand All @@ -116,13 +109,22 @@ public void run() {
}
}

void scheduleResync() {
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
}
}

/**
* Stops the resync thread pool first, then stops the reflector.
*/
public void stop() {
synchronized (this) {
reflector.stop();
reflectExecutor.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -40,24 +38,18 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private final Store<T> store;
private final OperationContext operationContext;
private final long resyncPeriodMillis;
private final ScheduledExecutorService resyncExecutor;
private final ReflectorWatcher<T> watcher;
private final AtomicBoolean isActive;
private final AtomicBoolean isWatcherStarted;
private final AtomicReference<Watch> watch;

public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Store store, OperationContext operationContext, long resyncPeriodMillis) {
this(apiTypeClass, listerWatcher, store, operationContext, resyncPeriodMillis, Executors.newSingleThreadScheduledExecutor());
}

public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Store store, OperationContext operationContext, long resyncPeriodMillis, ScheduledExecutorService resyncExecutor) {
this.apiTypeClass = apiTypeClass;
this.listerWatcher = listerWatcher;
this.store = store;
this.operationContext = operationContext;
this.resyncPeriodMillis = resyncPeriodMillis;
this.lastSyncResourceVersion = new AtomicReference<>();
this.resyncExecutor = resyncExecutor;
this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);
this.isActive = new AtomicBoolean(true);
this.isWatcherStarted = new AtomicBoolean(false);
Expand Down Expand Up @@ -96,9 +88,6 @@ private void reListAndSync() {
log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion);
lastSyncResourceVersion.set(latestResourceVersion);
store.replace(list.getItems(), latestResourceVersion);
if (!isActive.get()) {
resyncExecutor.shutdown();
}
}

private void startWatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
*/
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
Expand All @@ -25,9 +29,15 @@
import org.mockito.Mockito;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

class ControllerTest {
private DeltaFIFO<Pod> deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS);
Expand Down Expand Up @@ -69,4 +79,21 @@ void testControllerCreationWithResyncPeriodZero() {
// Then
assertEquals(0L, controller.getReflector().getResyncPeriodMillis());
}

@Test
void testControllerRunSchedulesResyncTaskWithFixedDelay() {
// Given
ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class, Mockito.RETURNS_DEEP_STUBS);
Controller<Pod, PodList> controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher,
simpleEntries -> {
},
() -> true,
1L, operationContext, eventListeners, scheduledExecutorService);

// When
controller.scheduleResync();

// Then
verify(scheduledExecutorService, times(1)).scheduleWithFixedDelay(any(), eq(1L), eq(1L), any());
}
}

0 comments on commit b106bfb

Please sign in to comment.