diff --git a/CHANGELOG.md b/CHANGELOG.md index a5a8c85c2e7..a1b631dc795 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ * Fix #3000: Set `no_proxy` in the okhttp builder in case the `proxy_url` is null * Fix #3011: properly handle enum types for additional printer columns * Fix #3020: annotations should now properly have their associated values when processing CRDs from the API +* Fix #3016: Use scheduleWithFixedDelay for resync task execution +* Fix #2991: reduced the level of ReflectWatcher event recieved log * Fix #3027: fix NPE when sorting events in KubernetesResourceUtil * Fix missing entry for Trigger in TektonTriggersResourceMappingProvider diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java index c1c3eba24df..d8a356d8eac 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java @@ -73,7 +73,7 @@ public class Controller apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { + Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners, ScheduledExecutorService resyncExecutor) { this.queue = queue; this.listerWatcher = listerWatcher; this.apiTypeClass = apiTypeClass; @@ -87,21 +87,18 @@ public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher this.eventListeners = eventListeners; // Starts one daemon thread for resync - this.resyncExecutor = Executors.newSingleThreadScheduledExecutor(); this.reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext); + this.resyncExecutor = resyncExecutor; + } + + public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { + this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor()); } public void run() { log.info("informer#Controller: ready to run resync and reflector runnable"); - // Start the resync runnable - if (fullResyncPeriod > 0) { - ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc); - if(!resyncExecutor.isShutdown()) { - resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); - } - } else { - log.info("informer#Controller: resync skipped due to 0 full resync period"); - } + + scheduleResync(); try { running = true; @@ -117,6 +114,16 @@ public void run() { } } + void scheduleResync() { + // Start the resync runnable + if (fullResyncPeriod > 0) { + ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc); + resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); + } else { + log.info("informer#Controller: resync skipped due to 0 full resync period"); + } + } + /** * Stops the resync thread pool first, then stops the reflector. */ diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java index 7a5c073d30e..12c14806c6a 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java @@ -16,9 +16,9 @@ package io.fabric8.kubernetes.client.informers.cache; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.junit.Assert.assertNotNull; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.concurrent.ConcurrentLinkedQueue; @@ -37,13 +37,18 @@ import io.fabric8.kubernetes.client.informers.ListerWatcher; import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; -class ControllerTest { - private DeltaFIFO deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS); - private abstract class AbstractPodListerWatcher implements ListerWatcher {}; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; - private ListerWatcher listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS); - private OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS); - private ConcurrentLinkedQueue eventListeners = Mockito.mock(ConcurrentLinkedQueue.class, Mockito.RETURNS_DEEP_STUBS); +class ControllerTest { + private final DeltaFIFO deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS); + private abstract static class AbstractPodListerWatcher implements ListerWatcher {}; + private static final Long WAIT_TIME = 500L; + private final ListerWatcher listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS); + private final OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS); + private final ConcurrentLinkedQueue eventListeners = Mockito.mock(ConcurrentLinkedQueue.class, Mockito.RETURNS_DEEP_STUBS); @Test @DisplayName("Controller initialized with resync period greater than zero should use provided resync period") @@ -116,7 +121,7 @@ void testControllerHasSync() throws InterruptedException { void testControllerRunWithInterruptedThread() throws InterruptedException { // Given + When // used to be able to interrupt the thread in the lambda - ThreadWrapper controllerThreadWrapper = new ThreadWrapper(); + ThreadWrapper controllerThreadWrapper = new ThreadWrapper(); long fullResyncPeriod = 1L; int numberOfResyncs = 1; final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); @@ -130,7 +135,7 @@ void testControllerRunWithInterruptedThread() throws InterruptedException { Thread controllerThread = newControllerThread(controller); controllerThreadWrapper.thread = controllerThread; // put the thread in the wrapper, so the lamba can interrupt it controllerThread.start(); - countDown.await(500, TimeUnit.MILLISECONDS); // a too short value does not allow the processLoop to start. + countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); // a too short value does not allow the processLoop to start. // Then ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); assertNotNull(resyncExecutor); @@ -156,7 +161,7 @@ void testControllerRunWithResyncPeriodGreaterThanZero() throws InterruptedExcept @Test @DisplayName("Controller with resync function throwing exception") - void testControllerRunsReyncFunctionThrowingException() throws InterruptedException { + void testControllerRunsResyncFunctionThrowingException() throws InterruptedException { // Given + When long fullResyncPeriod = 10L; int numberOfResyncs = 10; @@ -166,8 +171,7 @@ void testControllerRunsReyncFunctionThrowingException() throws InterruptedExcept () -> { countDown.countDown(); if( countDown.getCount() == 2 ) { - RuntimeException exception = new RuntimeException("make it fail"); - throw exception; + throw new RuntimeException("make it fail"); } return true; }, @@ -176,7 +180,7 @@ void testControllerRunsReyncFunctionThrowingException() throws InterruptedExcept Executable controllerRun = newControllerRun(controller); assertDoesNotThrow(controllerRun); // We give an extra cycle to avoid clock inaccurracy interruptions - countDown.await(( numberOfResyncs + 1 ) * fullResyncPeriod, TimeUnit.MILLISECONDS); + countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); controller.stop(); // Then assertThat(countDown.getCount()).isLessThanOrEqualTo(2); @@ -238,7 +242,7 @@ void testControllerRunWithResyncPeriodToZero() throws InterruptedException { void testControllerRunsReyncFunctionExpectedNumberOfTime() throws InterruptedException { // Given + When long fullResyncPeriod = 10L; - int numberOfResyncs = 100; + int numberOfResyncs = 20; final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, simpleEntries -> { }, @@ -248,7 +252,7 @@ void testControllerRunsReyncFunctionExpectedNumberOfTime() throws InterruptedExc Executable controllerRun = newControllerRun(controller); assertDoesNotThrow(controllerRun); // We give an extra cycle to avoid clock inaccurracy interruptions - countDown.await(( numberOfResyncs + 1 ) * fullResyncPeriod, TimeUnit.MILLISECONDS); + countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); controller.stop(); // Then ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); @@ -280,7 +284,7 @@ void testControllerNeverRunsReyncFunctionWhenPeriodIsZero() throws InterruptedEx assertThat(countDown.getCount()).isEqualTo(count); } - private class ThreadWrapper{ + private static class ThreadWrapper{ public Thread thread; public void interrupt() { if( thread != null) { @@ -290,22 +294,31 @@ public void interrupt() { } private Executable newControllerRun(Controller controller) { - return new Executable() { - @Override - public void execute() { - Thread controllerThread = newControllerThread(controller); - controllerThread.start(); - } + return () -> { + Thread controllerThread = newControllerThread(controller); + controllerThread.start(); }; } private Thread newControllerThread(Controller controller) { - Thread controllerThread = new Thread( new Runnable() { - @Override - public void run() { - controller.run(); - } - }); - return controllerThread; + return new Thread(controller::run); + } + + @Test + @DisplayName("Controller schedules resync tasks with fixed delay") + void testControllerRunSchedulesResyncTaskWithFixedDelay() { + // Given + ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class, Mockito.RETURNS_DEEP_STUBS); + Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, + simpleEntries -> { + }, + () -> true, + 1L, operationContext, eventListeners, scheduledExecutorService); + + // When + controller.scheduleResync(); + + // Then + verify(scheduledExecutorService, times(1)).scheduleWithFixedDelay(any(), eq(1L), eq(1L), any()); } }