Skip to content

Commit

Permalink
fixing #3001 #3186 over logging of websocket exceptions and closure
Browse files Browse the repository at this point in the history
this removes the runner abstraction

it also replaces the use of BlockingQueue with CompletableFuture.
  • Loading branch information
shawkins authored and manusa committed Jun 9, 2021
1 parent 20d87a8 commit c8879af
Show file tree
Hide file tree
Showing 13 changed files with 383 additions and 541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,42 @@
*/
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.WatchEvent;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.Watcher.Action;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public abstract class AbstractWatchManager<T> implements Watch {
import static java.net.HttpURLConnection.HTTP_GONE;

public abstract class AbstractWatchManager<T extends HasMetadata> implements Watch {

@FunctionalInterface
interface RequestBuilder {
Request build(final String resourceVersion);
}

private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);

final Watcher<T> watcher;
Expand All @@ -47,36 +63,39 @@ public abstract class AbstractWatchManager<T> implements Watch {
private ScheduledFuture<?> reconnectAttempt;

private final RequestBuilder requestBuilder;
protected ClientRunner runner;
protected final OkHttpClient client;

private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

AbstractWatchManager(
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder, Supplier<OkHttpClient> clientSupplier
) {
this.watcher = watcher;
this.reconnectLimit = reconnectLimit;
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent);
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();

this.requestBuilder = requestBuilder;
this.client = clientSupplier.get();

runWatch();
}

protected void initRunner(ClientRunner runner) {
if (this.runner != null) {
throw new IllegalStateException("ClientRunner has already been initialized");
}
this.runner = runner;
}

final void closeEvent(WatcherException cause) {
protected abstract void run(Request request);

protected abstract void closeRequest();

final void close(WatcherException cause) {
// proactively close the request (it will be called again in close)
// for reconnecting watchers, we may not complete onClose for a while
closeRequest();
if (!watcher.reconnecting() && forceClosed.getAndSet(true)) {
logger.debug("Ignoring duplicate firing of onClose event");
return;
} else {
watcher.onClose(cause);
}
watcher.onClose(cause);
close();
}

final void closeEvent() {
Expand All @@ -93,27 +112,42 @@ final synchronized void cancelReconnect() {
}
}

void scheduleReconnect(Runnable command, boolean shouldBackoff) {
void scheduleReconnect() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}

if (isForceClosed()) {
logger.warn("Ignoring error for already closed/closing connection");
return;
}

if (cannotReconnect()) {
close(new WatcherException("Exhausted reconnects"));
return;
}

logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? nextReconnectInterval()
: 0;
long delay = nextReconnectInterval();

synchronized (this) {
reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> {
try {
command.run();
runWatch();
if (isForceClosed()) {
closeRequest();
}
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close(new WatcherException("Unhandled exception in reconnect attempt", e));
} finally {
reconnectPending.set(false);
}
}, delay, TimeUnit.MILLISECONDS);
if (forceClosed.get()) {
if (isForceClosed()) {
cancelReconnect();
}
}
Expand Down Expand Up @@ -142,10 +176,6 @@ void eventReceived(Watcher.Action action, T resource) {
watcher.eventReceived(action, resource);
}

void onClose(WatcherException cause) {
watcher.onClose(cause);
}

void updateResourceVersion(final String newResourceVersion) {
resourceVersion.set(newResourceVersion);
}
Expand All @@ -154,52 +184,89 @@ protected void runWatch() {
final Request request = requestBuilder.build(resourceVersion.get());
logger.debug("Watching {}...", request.url());

runner.run(request);
}

public void waitUntilReady() {
runner.waitUntilReady();
}

static void closeWebSocket(WebSocket webSocket) {
if (webSocket != null) {
logger.debug("Closing websocket {}", webSocket);
try {
if (!webSocket.close(1000, null)) {
logger.warn("Failed to close websocket");
}
} catch (IllegalStateException e) {
logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage());
}
}
closeRequest(); // only one can be active at a time
run(request);
}

@Override
public void close() {
logger.debug("Force closing the watch {}", this);
closeEvent();
runner.close();
closeRequest();
cancelReconnect();
}

@FunctionalInterface
interface RequestBuilder {
Request build(final String resourceVersion);
protected WatchEvent readWatchEvent(String messageSource) {
WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class);
KubernetesResource object = null;
if (event != null) {
object = event.getObject();
}
// when watching API Groups we don't get a WatchEvent resource
// so the object will be null
// so lets try parse the message as a KubernetesResource
// as it will probably be a list of resources like a BuildList
if (object == null) {
object = Serialization.unmarshal(messageSource, KubernetesResource.class);
if (event == null) {
event = new WatchEvent(object, "MODIFIED");
} else {
event.setObject(object);
}
}
if (event.getType() == null) {
event.setType("MODIFIED");
}
return event;
}

abstract static class ClientRunner {
private final OkHttpClient client;

protected ClientRunner(OkHttpClient client) {
this.client = cloneAndCustomize(client);
protected void onMessage(String message) {
try {
WatchEvent event = readWatchEvent(message);
Object object = event.getObject();
if (object instanceof HasMetadata) {
@SuppressWarnings("unchecked")
T obj = (T) object;
updateResourceVersion(obj.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
eventReceived(action, obj);
} else if (object instanceof KubernetesResourceList) {
// Dirty cast - should always be valid though
KubernetesResourceList list = (KubernetesResourceList) object;
updateResourceVersion(list.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
List<HasMetadata> items = list.getItems();
if (items != null) {
for (HasMetadata item : items) {
eventReceived(action, (T) item);
}
}
} else if (object instanceof Status) {
Status status = (Status) object;

onStatus(status);
} else {
logger.error("Unknown message received: {}", message);
}
} catch (ClassCastException e) {
logger.error("Received wrong type of object for watch", e);
} catch (IllegalArgumentException e) {
logger.error("Invalid event type", e);
} catch (Exception e) {
logger.error("Unhandled exception encountered in watcher event handler", e);
}
}

abstract void run(Request request);
void close() {}
void waitUntilReady() {}
abstract OkHttpClient cloneAndCustomize(OkHttpClient client);
OkHttpClient client() {
return client;
protected boolean onStatus(Status status) {
// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
close(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
return true;
}

eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status);
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,8 +81,7 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final InputStreamPumper pumper;

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
private final ExecListener listener;

private final AtomicBoolean explicitlyClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -185,7 +184,7 @@ private void closeWebSocketOnce(int code, String reason) {
}

public void waitUntilReady() {
Utils.waitUntilReady(queue, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
Utils.waitUntilReadyOrFail(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -207,11 +206,10 @@ public void onOpen(WebSocket webSocket, Response response) {
webSocketRef.set(webSocket);
if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(null);
}
} catch (IOException e) {
queue.add(new KubernetesClientException(OperationSupport.createStatus(response)));
startedFuture.completeExceptionally(new KubernetesClientException(OperationSupport.createStatus(response)));
} finally {
if (listener != null) {
listener.onOpen(response);
Expand All @@ -234,10 +232,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
try {
Status status = OperationSupport.createStatus(response);
LOGGER.error("Exec Failure: HTTP:" + status.getCode() + ". Message:" + status.getMessage(), t);
//We only need to queue startup failures.
if (!started.get()) {
queue.add(new KubernetesClientException(status));
}
startedFuture.completeExceptionally(new KubernetesClientException(status));

cleanUpOnce();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.io.PipedOutputStream;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -53,8 +53,7 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {
private final PipedInputStream output;
private final Set<Closeable> toClose = new LinkedHashSet<>();

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean closed = new AtomicBoolean(false);

Expand Down Expand Up @@ -116,7 +115,7 @@ private void cleanUp() {
}

public void waitUntilReady() {
if (!Utils.waitUntilReady(queue, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis.");
}
Expand All @@ -136,10 +135,7 @@ public void onFailure(Call call, IOException ioe) {

LOGGER.error("Log Callback Failure.", ioe);
cleanUp();
//We only need to queue startup failures.
if (!started.get()) {
queue.add(ioe);
}
startedFuture.completeExceptionally(ioe);
}

@Override
Expand All @@ -157,8 +153,7 @@ public void onResponse(Call call, final Response response) throws IOException {

if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(null);
}

}
Expand Down
Loading

0 comments on commit c8879af

Please sign in to comment.