Skip to content

Commit

Permalink
wip: Watcher has dedicated close for exceptions and graceful close
Browse files Browse the repository at this point in the history
  • Loading branch information
manusa committed Nov 23, 2020
1 parent fba6601 commit 4b9c777
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,23 @@
*/
package io.fabric8.kubernetes.client;

import org.slf4j.LoggerFactory;

public interface Watcher<T> {

void eventReceived(Action action, T resource);

/**
* Run when the watcher finally closes.
* Invoked when the watcher is gracefully closed.
*/
default void onClose() {
LoggerFactory.getLogger(Watcher.class).debug("Watcher closed");
}

/**
* Invoked when the watcher closes due to an Exception.
*
* @param cause What caused the watcher to be closed. Null means normal close.
* @param cause What caused the watcher to be closed.
*/
void onClose(WatcherException cause);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public void eventReceived(Action action, T resource) {

@Override
public void onClose(WatcherException cause) {
future.completeExceptionally(cause == null ? new WatcherException("Watcher closed") : cause);
future.completeExceptionally(cause);
}

@Override
public void onClose() {
future.completeExceptionally(new WatcherException("Watcher closed"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ final void closeEvent(WatcherException cause) {
watcher.onClose(cause);
}

final void closeEvent() {
if (forceClosed.getAndSet(true)) {
logger.debug("Ignoring duplicate firing of onClose event");
return;
}
watcher.onClose();
}

static void closeWebSocket(WebSocket webSocket) {
if (webSocket != null) {
logger.debug("Closing websocket {}", webSocket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void waitUntilReady() {
@Override
public void close() {
logger.debug("Force closing the watch {}", this);
closeEvent(null);
closeEvent();
closeWebSocket(webSocketRef.getAndSet(null));
if (!executor.isShutdown()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public void waitUntilReady() {
@Override
public void close() {
logger.debug("Force closing the watch {}", this);
closeEvent(null);
closeEvent();
closeWebSocket(webSocketRef.getAndSet(null));
if (!executor.isShutdown()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public void onClose(WatcherException exception) {
onClose.run();
}

@Override
public void onClose() {
log.info("Watch gracefully closed");
onClose.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@ public void onClose(WatcherException cause) {
delegate.onClose(cause);
}
}

@Override
public void onClose() {
if (enabled) {
delegate.onClose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@SuppressWarnings({"rawtypes", "FieldCanBeLocal"})
class BaseOperationWatchTest {

private Watcher<Pod> watcher;
private OperationContext operationContext;
private BaseOperation<Pod, PodList, PodResource<Pod>> baseOperation;

@SuppressWarnings("unchecked")
@BeforeEach
void setUp() {
watcher = mock(Watcher.class);
operationContext = mock(OperationContext.class, RETURNS_DEEP_STUBS);
baseOperation = new BaseOperation<>(operationContext);
}

@Test
@DisplayName("watch, with exception on connection open, should throw Exception and close WatchConnectionManager")
void watchWithExceptionOnOpen() {
try (final MockedConstruction<WatchConnectionManager> m = mockConstruction(WatchConnectionManager.class, (mock, context) -> {
// Given
doThrow(new KubernetesClientException("Mocked Connection Error")).when(mock).waitUntilReady();
})) {
// When
final KubernetesClientException result = assertThrows(KubernetesClientException.class,
() -> {
baseOperation.watch(watcher);
fail();
});
// Then
assertThat(result).hasMessage("Mocked Connection Error");
assertThat(m.constructed())
.hasSize(1)
.element(0)
.matches(wcm -> {
verify(wcm, times(1)).close();
return true;
});
}
}

@Test
@DisplayName("watch, with retryable exception on connection open, should close initial WatchConnectionManager and retry")
void watchWithRetryableExceptionOnOpen() {
try (
final MockedConstruction<WatchConnectionManager> m = mockConstruction(WatchConnectionManager.class, (mock, context) -> {
// Given
doThrow(new KubernetesClientException(new StatusBuilder().withCode(503).build())).when(mock).waitUntilReady();
});
final MockedConstruction<WatchHTTPManager> mHttp = mockConstruction(WatchHTTPManager.class)
) {
// When
final Watch result = baseOperation.watch(watcher);
// Then
assertThat(result).isInstanceOf(WatchHTTPManager.class).isSameAs(mHttp.constructed().get(0));
assertThat(m.constructed())
.hasSize(1)
.element(0)
.matches(wcm -> {
verify(wcm, times(1)).close();
return true;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ void itCompletesExceptionallyWithNoRetryOnCloseGone() throws Exception {
assertFalse(condition.isCalled());
}

@Test
void itCompletesExceptionallyWithRetryOnGracefulClose() throws Exception {
TrackingPredicate condition = condition(ss -> true);
WaitForConditionWatcher<ConfigMap> watcher = new WaitForConditionWatcher<>(condition);
watcher.onClose();
assertTrue(watcher.getFuture().isDone());
try {
watcher.getFuture().get();
fail("should have thrown exception");
} catch (ExecutionException e) {
assertEquals(WatcherException.class, e.getCause().getClass());
assertEquals("Watcher closed", e.getCause().getMessage());
assertTrue(((WatcherException) e.getCause()).isShouldRetry());
}
assertFalse(condition.isCalled());
}
private TrackingPredicate condition(Predicate<ConfigMap> condition) {
return new TrackingPredicate(condition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,32 @@ class AbstractWatchManagerTest {
@DisplayName("closeEvent, is idempotent, multiple calls only close watcher once")
void closeEventIsIdempotent() {
// Given
final AtomicInteger closeCount = new AtomicInteger();
final Watcher<Object> watcher = new WatcherAdapter<Object>() {
final WatcherAdapter<Object> watcher = new WatcherAdapter<>();
final WatchManager<Object> awm = new WatchManager<>(
watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0,
mock(OkHttpClient.class));
// When
for (int it = 0; it < 10; it++) {
awm.closeEvent();
}
// Then
assertThat(watcher.closeCount.get()).isEqualTo(1);
}

@Override
public void onClose(WatcherException cause) {
closeCount.addAndGet(1);
}
};
@Test
@DisplayName("closeEvent with Exception, is idempotent, multiple calls only close watcher once")
void closeEventWithExceptionIsIdempotent() {
// Given
final WatcherAdapter<Object> watcher = new WatcherAdapter<>();
final WatchManager<Object> awm = new WatchManager<>(
watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0,
mock(OkHttpClient.class));
// When
for (int it = 0; it < 10; it++) {
awm.closeEvent(null);
awm.closeEvent();
}
// Then
assertThat(closeCount.get()).isEqualTo(1);
assertThat(watcher.closeCount.get()).isEqualTo(1);
}

@Test
Expand All @@ -68,13 +77,20 @@ void closeWebSocket() {
}

private static class WatcherAdapter<T> implements Watcher<T> {
private final AtomicInteger closeCount = new AtomicInteger(0);

@Override
public void eventReceived(Action action, T resource) {

}
@Override
public void onClose(WatcherException cause) {
closeCount.addAndGet(1);
}

@Override
public void onClose() {
closeCount.addAndGet(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ public void eventReceived(Action action, Pod pod) {
}

@Override
public void onClose(WatcherException e) {
public void onClose(WatcherException cause) {

}

@Override
public void onClose() {
closeLatch.countDown();
logger.info("watch closed...");
}
Expand Down
Loading

0 comments on commit 4b9c777

Please sign in to comment.