From 51badc4ad368aa48cfea8b22b8de5d92cf76b328 Mon Sep 17 00:00:00 2001 From: Rohan Kumar Date: Fri, 30 Apr 2021 15:31:06 +0530 Subject: [PATCH] Fix #3016: Use scheduleWithFixedDelay for resync task execution + When using `resyncExecutor.scheduleAtFixedRate` we're seeing inconsistent resync intervals since it doesn't wait for previous task to be terminated. Hence using `resyncExecutor.scheduleWithFixedDelay` instead. + Remove unused reflectExecutor from Controller and Reflector since we don't use scheduled reflector relist since #2964 --- CHANGELOG.md | 1 + .../client/informers/cache/Controller.java | 32 ++++++++++--------- .../client/informers/cache/Reflector.java | 5 --- .../informers/cache/ControllerTest.java | 22 +++++++++++++ 4 files changed, 40 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60686e3c4c7..b96b610a2ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Fix #2989: serialization will generate valid yaml when using subtypes * Fix #2996: Generating CRDs from the API should now properly work * Fix #3000: Set `no_proxy` in the okhttp builder in case the `proxy_url` is null +* Fix #3016: Use scheduleWithFixedDelay for resync task execution * Fix #2991: reduced the level of ReflectWatcher event recieved log * Fix #3027: fix NPE when sorting events in KubernetesResourceUtil * Fix #2994: updating the SharedIndexInformer indexer state for a delete event generated by resync diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java index 20b15777f2a..20136a2fa59 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java @@ -61,8 +61,6 @@ public class Controller>> processFunc; - private final ScheduledExecutorService reflectExecutor; - private final ScheduledExecutorService resyncExecutor; private ScheduledFuture resyncFuture; @@ -73,7 +71,7 @@ public class Controller apiTypeClass; - public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { + Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners, ScheduledExecutorService resyncExecutor) { this.queue = queue; this.listerWatcher = listerWatcher; this.apiTypeClass = apiTypeClass; @@ -86,24 +84,19 @@ public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher this.operationContext = context; this.eventListeners = eventListeners; - // Starts one daemon thread for reflector - this.reflectExecutor = Executors.newSingleThreadScheduledExecutor(); - // Starts one daemon thread for resync - this.resyncExecutor = Executors.newSingleThreadScheduledExecutor(); + this.resyncExecutor = resyncExecutor; initReflector(); } + public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { + this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor()); + } + public void run() { log.info("informer#Controller: ready to run resync and reflector runnable"); - // Start the resync runnable - if (fullResyncPeriod > 0) { - ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc); - resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); - } else { - log.info("informer#Controller: resync skipped due to 0 full resync period"); - } + scheduleResync(); try { reflector.listAndWatch(); @@ -116,13 +109,22 @@ public void run() { } } + void scheduleResync() { + // Start the resync runnable + if (fullResyncPeriod > 0) { + ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc); + resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); + } else { + log.info("informer#Controller: resync skipped due to 0 full resync period"); + } + } + /** * Stops the resync thread pool first, then stops the reflector. */ public void stop() { synchronized (this) { reflector.stop(); - reflectExecutor.shutdown(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java index 08444135b67..fb70059f747 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java @@ -40,7 +40,6 @@ public class Reflector store; private final OperationContext operationContext; private final long resyncPeriodMillis; - private final ScheduledExecutorService resyncExecutor; private final ReflectorWatcher watcher; private final AtomicBoolean isActive; private final AtomicBoolean isWatcherStarted; @@ -57,7 +56,6 @@ public Reflector(Class apiTypeClass, ListerWatcher listerWatcher, Store this.operationContext = operationContext; this.resyncPeriodMillis = resyncPeriodMillis; this.lastSyncResourceVersion = new AtomicReference<>(); - this.resyncExecutor = resyncExecutor; this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync); this.isActive = new AtomicBoolean(true); this.isWatcherStarted = new AtomicBoolean(false); @@ -96,9 +94,6 @@ private void reListAndSync() { log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion); lastSyncResourceVersion.set(latestResourceVersion); store.replace(list.getItems(), latestResourceVersion); - if (!isActive.get()) { - resyncExecutor.shutdown(); - } } private void startWatcher() { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java index 97c647aede0..2dc54ee4b50 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java @@ -25,9 +25,14 @@ import org.mockito.Mockito; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; class ControllerTest { private DeltaFIFO deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS); @@ -69,4 +74,21 @@ void testControllerCreationWithResyncPeriodZero() { // Then assertEquals(0L, controller.getReflector().getResyncPeriodMillis()); } + + @Test + void testControllerRunSchedulesResyncTaskWithFixedDelay() { + // Given + ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class, Mockito.RETURNS_DEEP_STUBS); + Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, + simpleEntries -> { + }, + () -> true, + 1L, operationContext, eventListeners, scheduledExecutorService); + + // When + controller.scheduleResync(); + + // Then + verify(scheduledExecutorService, times(1)).scheduleWithFixedDelay(any(), eq(1L), eq(1L), any()); + } }