Skip to content

Commit

Permalink
test: Update outdated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
manusa committed May 24, 2021
1 parent 751fc28 commit df5d802
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
package io.fabric8.kubernetes.client.informers;

public interface SharedInformerEventListener {

/**
* @deprecated Use {@link #onException(SharedIndexInformer, Exception)} instead
*/
@Deprecated
void onException(Exception exception);

default void onException(SharedIndexInformer<?> informer, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler(i.e they don't specify one and just want to use the shared informer's default
// value).
private long defaultEventHandlerResyncPeriod;
private final long defaultEventHandlerResyncPeriod;

private final Reflector<T, L> reflector;
private final Class<T> apiTypeClass;
private final ProcessorStore<T> processorStore;
private final Cache<T> indexer;
private final SharedProcessor<T> processor;
private final Executor informerExecutor;

private AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean();
private volatile boolean stopped = false;

private ScheduledFuture<?> resyncFuture;

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor) {
if (resyncPeriod < 0) {
throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value");
Expand All @@ -81,7 +81,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
this.processor = new SharedProcessor<>(new SerialExecutor(informerExecutor));
this.indexer = new Cache<>();
this.indexer.setIsRunning(this::isRunning);

processorStore = new ProcessorStore<>(this.indexer, this.processor);
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, processorStore, context);
}
Expand Down Expand Up @@ -121,9 +121,9 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler<T> handler, lon
}
}
}

ProcessorListener<T> listener = this.processor.addProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis));

if (!started.get()) {
return;
}
Expand Down Expand Up @@ -151,7 +151,7 @@ public void run() {
log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis);

scheduleResync(processor::shouldResync);

reflector.listSyncAndWatch();
// stop called while run is called could be ineffective, check for it afterwards
synchronized (this) {
Expand Down Expand Up @@ -204,9 +204,9 @@ private long determineResyncPeriod(long desired, long check) {

@Override
public boolean isRunning() {
return !stopped && started.get() && reflector.isRunning();
return !stopped && started.get() && reflector.isRunning();
}

synchronized void scheduleResync(Supplier<Boolean> resyncFunc) {
// schedule the resync runnable
if (resyncCheckPeriodMillis > 0) {
Expand All @@ -216,15 +216,15 @@ synchronized void scheduleResync(Supplier<Boolean> resyncFunc) {
log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass);
}
}

public long getFullResyncPeriod() {
return resyncCheckPeriodMillis;
}

ScheduledFuture<?> getResyncFuture() {
return resyncFuture;
}

@Override
public Class<T> getApiTypeClass() {
return apiTypeClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,27 @@
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.WebSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

class AbstractWatchManagerTest {

private MockedStatic<Executors> executors;
private ScheduledExecutorService executorService;

@BeforeEach
void setUp() {
executorService = mock(ScheduledExecutorService.class, RETURNS_DEEP_STUBS);
executors = mockStatic(Executors.class);
executors.when(() -> Executors.newSingleThreadScheduledExecutor(any())).thenReturn(executorService);
}

@AfterEach
void tearDown() {
executors.close();
}

@Test
@DisplayName("closeEvent, is idempotent, multiple calls only close watcher once")
void closeEventIsIdempotent() {
Expand All @@ -71,7 +54,7 @@ void closeEventIsIdempotent() {
}

@Test
@DisplayName("closeEvent with Exception, is idempotent, multiple calls only close watcher once")
@DisplayName("closeEvent, with Exception, is idempotent, multiple calls only close watcher once")
void closeEventWithExceptionIsIdempotent() {
// Given
final WatcherAdapter<Object> watcher = new WatcherAdapter<>();
Expand Down Expand Up @@ -100,7 +83,7 @@ void closeWebSocket() {
void nextReconnectInterval() {
// Given
final WatchManager<Object> awm = new WatchManager<>(
null, mock(ListOptions.class), 0, 10, 5, null);
null, mock(ListOptions.class), 0, 10, 5);
// When-Then
assertThat(awm.nextReconnectInterval()).isEqualTo(10);
assertThat(awm.nextReconnectInterval()).isEqualTo(20);
Expand All @@ -111,18 +94,73 @@ void nextReconnectInterval() {
assertThat(awm.nextReconnectInterval()).isEqualTo(320);
}

@Test
@DisplayName("cancelReconnect, with null attempt, should do nothing")
void cancelReconnectNullAttempt() {
// Given
final ScheduledFuture<?> sf = spy(ScheduledFuture.class);
final WatcherAdapter<Object> watcher = new WatcherAdapter<>();
final WatchManager<Object> awm = withDefaultWatchManager(watcher);
// When
awm.cancelReconnect();
// Then
verify(sf, times(0)).cancel(true);
}

@Test
@DisplayName("cancelReconnect, with non-null attempt, should cancel")
void cancelReconnectNonNullAttempt() {
// Given
final ScheduledFuture<?> sf = mock(ScheduledFuture.class);
final MockedStatic<Utils> utils = mockStatic(Utils.class);
utils.when(() -> Utils.schedule(any(), any(), anyLong(), any())).thenReturn(sf);
final WatcherAdapter<Object> watcher = new WatcherAdapter<>();
final WatchManager<Object> awm = withDefaultWatchManager(watcher);
awm.scheduleReconnect(null, false);
// When
awm.cancelReconnect();
// Then
verify(sf, times(1)).cancel(true);
}

@Test
@DisplayName("isClosed, after close invocation, should return true")
void isForceClosedWhenClosed() {
// Given
final WatcherAdapter<Object> watcher = new WatcherAdapter<>();
final WatchManager<Object> awm = withDefaultWatchManager(watcher);
awm.initRunner(mock(AbstractWatchManager.ClientRunner.class));
// When
awm.close();
// Then
assertThat(awm.isForceClosed()).isTrue();
}

@Test
@DisplayName("close, after close invocation, should return true")
void closeWithNonNullRunnerShouldCancelRunner() {
// Given
final AbstractWatchManager.ClientRunner clientRunner = mock(AbstractWatchManager.ClientRunner.class);
final WatcherAdapter<Object> watcher = new WatcherAdapter<>();
final WatchManager<Object> awm = withDefaultWatchManager(watcher);
awm.initRunner(clientRunner);
// When
awm.close();
// Then
verify(clientRunner, times(1)).close();
}

private static <T> WatchManager<T> withDefaultWatchManager(Watcher<T> watcher) {
return new WatchManager<>(
watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0,
mock(OkHttpClient.class));
watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0);
}

private static class WatcherAdapter<T> implements Watcher<T> {
private final AtomicInteger closeCount = new AtomicInteger(0);

@Override
public void eventReceived(Action action, T resource) {}

@Override
public void onClose(WatcherException cause) {
closeCount.addAndGet(1);
Expand All @@ -136,20 +174,8 @@ public void onClose() {

private static final class WatchManager<T> extends AbstractWatchManager<T> {

public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, OkHttpClient clonedClient) {
public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) {
super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null);
initRunner(new ClientRunner(clonedClient) {
@Override
void run(Request request) {}

@Override
OkHttpClient cloneAndCustomize(OkHttpClient client) {
return clonedClient;
}
});
}
@Override
public void close() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -851,9 +852,14 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) {

@Test
void testRunAfterStop() {
// Given
SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, 0);
podInformer.stop();
assertThrows(IllegalStateException.class, podInformer::run);
// When
final IllegalStateException result = assertThrows(IllegalStateException.class, podInformer::run);
// Then
assertThat(result)
.hasMessage("Cannot restart a stopped informer");
}

private KubernetesResource getAnimal(String name, String order, String resourceVersion) {
Expand Down

0 comments on commit df5d802

Please sign in to comment.