Skip to content

Commit

Permalink
Fix #3016: Use scheduleWithFixedDelay for resync task execution
Browse files Browse the repository at this point in the history
+ When using `resyncExecutor.scheduleAtFixedRate` we're seeing inconsistent
  resync intervals since it doesn't wait for previous task to be
  terminated. Hence using `resyncExecutor.scheduleWithFixedDelay` instead.
+ Minor changes in ControllerTest regarding resync expectations/wait
  time
  • Loading branch information
rohanKanojia committed Apr 30, 2021
1 parent b4f36db commit e4da6aa
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 44 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* Fix #3000: Set `no_proxy` in the okhttp builder in case the `proxy_url` is null
* Fix #3011: properly handle enum types for additional printer columns
* Fix #3020: annotations should now properly have their associated values when processing CRDs from the API
* Fix #3016: Use scheduleWithFixedDelay for resync task execution
* Fix #2991: reduced the level of ReflectWatcher event recieved log
* Fix #3027: fix NPE when sorting events in KubernetesResourceUtil
* Fix missing entry for Trigger in TektonTriggersResourceMappingProvider

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private volatile boolean running;

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners, ScheduledExecutorService resyncExecutor) {
this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
Expand All @@ -87,21 +87,18 @@ public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L>
this.eventListeners = eventListeners;

// Starts one daemon thread for resync
this.resyncExecutor = Executors.newSingleThreadScheduledExecutor();
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext);
this.resyncExecutor = resyncExecutor;
}

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor());
}

public void run() {
log.info("informer#Controller: ready to run resync and reflector runnable");
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
if(!resyncExecutor.isShutdown()) {
resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
}
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
}

scheduleResync();

try {
running = true;
Expand All @@ -117,6 +114,16 @@ public void run() {
}
}

void scheduleResync() {
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
}
}

/**
* Stops the resync thread pool first, then stops the reflector.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package io.fabric8.kubernetes.client.informers.cache;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -37,13 +37,18 @@
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;

class ControllerTest {
private DeltaFIFO<Pod> deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS);
private abstract class AbstractPodListerWatcher implements ListerWatcher<Pod, PodList> {};
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

private ListerWatcher<Pod, PodList> listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS);
private OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS);
private ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners = Mockito.mock(ConcurrentLinkedQueue.class, Mockito.RETURNS_DEEP_STUBS);
class ControllerTest {
private final DeltaFIFO<Pod> deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS);
private abstract static class AbstractPodListerWatcher implements ListerWatcher<Pod, PodList> {};
private static final Long WAIT_TIME = 500L;
private final ListerWatcher<Pod, PodList> listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS);
private final OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS);
private final ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners = Mockito.mock(ConcurrentLinkedQueue.class, Mockito.RETURNS_DEEP_STUBS);

@Test
@DisplayName("Controller initialized with resync period greater than zero should use provided resync period")
Expand Down Expand Up @@ -97,7 +102,7 @@ void testStop() {

@Test
@DisplayName("Controller initialized with resync period should have synced")
void testControllerHasSync() throws InterruptedException {
void testControllerHasSync() {
// Given + When
Controller<Pod, PodList> controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher,
simpleEntries -> { }, () -> true,
Expand All @@ -116,7 +121,7 @@ void testControllerHasSync() throws InterruptedException {
void testControllerRunWithInterruptedThread() throws InterruptedException {
// Given + When
// used to be able to interrupt the thread in the lambda
ThreadWrapper controllerThreadWrapper = new ThreadWrapper();
ThreadWrapper controllerThreadWrapper = new ThreadWrapper();
long fullResyncPeriod = 1L;
int numberOfResyncs = 1;
final CountDownLatch countDown = new CountDownLatch(numberOfResyncs);
Expand All @@ -130,7 +135,7 @@ void testControllerRunWithInterruptedThread() throws InterruptedException {
Thread controllerThread = newControllerThread(controller);
controllerThreadWrapper.thread = controllerThread; // put the thread in the wrapper, so the lamba can interrupt it
controllerThread.start();
countDown.await(500, TimeUnit.MILLISECONDS); // a too short value does not allow the processLoop to start.
countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); // a too short value does not allow the processLoop to start.
// Then
ScheduledExecutorService resyncExecutor = controller.getResyncExecutor();
assertNotNull(resyncExecutor);
Expand All @@ -156,7 +161,7 @@ void testControllerRunWithResyncPeriodGreaterThanZero() throws InterruptedExcept

@Test
@DisplayName("Controller with resync function throwing exception")
void testControllerRunsReyncFunctionThrowingException() throws InterruptedException {
void testControllerRunsResyncFunctionThrowingException() throws InterruptedException {
// Given + When
long fullResyncPeriod = 10L;
int numberOfResyncs = 10;
Expand All @@ -166,17 +171,15 @@ void testControllerRunsReyncFunctionThrowingException() throws InterruptedExcept
() -> {
countDown.countDown();
if( countDown.getCount() == 2 ) {
RuntimeException exception = new RuntimeException("make it fail");
throw exception;
throw new RuntimeException("make it fail");
}
return true;
},
fullResyncPeriod, operationContext, eventListeners);

Executable controllerRun = newControllerRun(controller);
assertDoesNotThrow(controllerRun);
// We give an extra cycle to avoid clock inaccurracy interruptions
countDown.await(( numberOfResyncs + 1 ) * fullResyncPeriod, TimeUnit.MILLISECONDS);
countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS);
controller.stop();
// Then
assertThat(countDown.getCount()).isLessThanOrEqualTo(2);
Expand Down Expand Up @@ -238,7 +241,7 @@ void testControllerRunWithResyncPeriodToZero() throws InterruptedException {
void testControllerRunsReyncFunctionExpectedNumberOfTime() throws InterruptedException {
// Given + When
long fullResyncPeriod = 10L;
int numberOfResyncs = 100;
int numberOfResyncs = 10;
final CountDownLatch countDown = new CountDownLatch(numberOfResyncs);
Controller<Pod, PodList> controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher,
simpleEntries -> { },
Expand All @@ -248,13 +251,12 @@ void testControllerRunsReyncFunctionExpectedNumberOfTime() throws InterruptedExc
Executable controllerRun = newControllerRun(controller);
assertDoesNotThrow(controllerRun);
// We give an extra cycle to avoid clock inaccurracy interruptions
countDown.await(( numberOfResyncs + 1 ) * fullResyncPeriod, TimeUnit.MILLISECONDS);
countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS);
controller.stop();
// Then
ScheduledExecutorService resyncExecutor = controller.getResyncExecutor();
assertNotNull(resyncExecutor);
assertThat(resyncExecutor.isShutdown()).isTrue();
assertThat(resyncExecutor.isTerminated()).isTrue();
assertThat(countDown.getCount()).isLessThanOrEqualTo(1);
}

Expand All @@ -270,7 +272,6 @@ void testControllerNeverRunsReyncFunctionWhenPeriodIsZero() throws InterruptedEx
0, operationContext, eventListeners);
Executable controllerRun = newControllerRun(controller);
assertDoesNotThrow(controllerRun);
// We give an extra cycle to avoid clock inaccurracy interruptions
countDown.await(1000, TimeUnit.MILLISECONDS);
controller.stop();
// Then
Expand All @@ -280,7 +281,7 @@ void testControllerNeverRunsReyncFunctionWhenPeriodIsZero() throws InterruptedEx
assertThat(countDown.getCount()).isEqualTo(count);
}

private class ThreadWrapper{
private static class ThreadWrapper{
public Thread thread;
public void interrupt() {
if( thread != null) {
Expand All @@ -290,22 +291,31 @@ public void interrupt() {
}

private Executable newControllerRun(Controller<Pod, PodList> controller) {
return new Executable() {
@Override
public void execute() {
Thread controllerThread = newControllerThread(controller);
controllerThread.start();
}
return () -> {
Thread controllerThread = newControllerThread(controller);
controllerThread.start();
};
}

private Thread newControllerThread(Controller<Pod, PodList> controller) {
Thread controllerThread = new Thread( new Runnable() {
@Override
public void run() {
controller.run();
}
});
return controllerThread;
return new Thread(controller::run);
}

@Test
@DisplayName("Controller schedules resync tasks with fixed delay")
void testControllerRunSchedulesResyncTaskWithFixedDelay() {
// Given
ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class, Mockito.RETURNS_DEEP_STUBS);
Controller<Pod, PodList> controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher,
simpleEntries -> {
},
() -> true,
1L, operationContext, eventListeners, scheduledExecutorService);

// When
controller.scheduleResync();

// Then
verify(scheduledExecutorService, times(1)).scheduleWithFixedDelay(any(), eq(1L), eq(1L), any());
}
}

0 comments on commit e4da6aa

Please sign in to comment.