Skip to content

Commit

Permalink
fixing fabric8io#3001 fabric8io#3186 over logging of websocket except…
Browse files Browse the repository at this point in the history
…ions and closure

this localizes the state of a websocket to the listener and removes the
runner abstraction

it also replaces the use of BlockingQueue with CompletableFuture.
  • Loading branch information
shawkins committed Jun 3, 2021
1 parent 71c1a8c commit 5c6ef3e
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 488 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,36 @@
*/
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.WatchEvent;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.Watcher.Action;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public abstract class AbstractWatchManager<T> implements Watch {
import static java.net.HttpURLConnection.HTTP_GONE;

public abstract class AbstractWatchManager<T extends HasMetadata> implements Watch {

private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);

Expand All @@ -47,12 +59,12 @@ public abstract class AbstractWatchManager<T> implements Watch {
private ScheduledFuture<?> reconnectAttempt;

private final RequestBuilder requestBuilder;
protected ClientRunner runner;
protected final OkHttpClient client;

private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

AbstractWatchManager(
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder, Supplier<OkHttpClient> clientSupplier
) {
this.watcher = watcher;
this.reconnectLimit = reconnectLimit;
Expand All @@ -61,23 +73,19 @@ public abstract class AbstractWatchManager<T> implements Watch {
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();

this.requestBuilder = requestBuilder;
this.client = clientSupplier.get();

runWatch();
}

protected void initRunner(ClientRunner runner) {
if (this.runner != null) {
throw new IllegalStateException("ClientRunner has already been initialized");
}
this.runner = runner;
}

final void closeEvent(WatcherException cause) {
final void close(WatcherException cause) {
if (!watcher.reconnecting() && forceClosed.getAndSet(true)) {
logger.debug("Ignoring duplicate firing of onClose event");
return;
} else {
watcher.onClose(cause);
}
watcher.onClose(cause);
close();
}

final void closeEvent() {
Expand All @@ -94,22 +102,34 @@ final synchronized void cancelReconnect() {
}
}

void scheduleReconnect(Runnable command, boolean shouldBackoff) {
void scheduleReconnect() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}

if (isForceClosed()) {
logger.warn("Ignoring error for already closed/closing connection");
return;
}

if (cannotReconnect()) {
close(new WatcherException("Exhausted reconnects"));
return;
}

logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? nextReconnectInterval()
: 0;
long delay = nextReconnectInterval();

synchronized (this) {
reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> {
try {
command.run();
runWatch();
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close(new WatcherException("Unhandled exception in reconnect attempt", e));
} finally {
reconnectPending.set(false);
}
Expand Down Expand Up @@ -145,10 +165,6 @@ void eventReceived(Watcher.Action action, T resource) {
watcher.eventReceived(action, resource);
}

void onClose(WatcherException cause) {
watcher.onClose(cause);
}

void updateResourceVersion(final String newResourceVersion) {
resourceVersion.set(newResourceVersion);
}
Expand All @@ -157,22 +173,18 @@ protected void runWatch() {
final Request request = requestBuilder.build(resourceVersion.get());
logger.debug("Watching {}...", request.url());

runner.run(request);
run(request);
}

public void waitUntilReady() {
runner.waitUntilReady();
}

static void closeWebSocket(WebSocket webSocket) {
if (webSocket != null) {
logger.debug("Closing websocket {}", webSocket);
try {
if (!webSocket.close(1000, null)) {
logger.warn("Failed to close websocket");
logger.debug("Websocket already closed {}", webSocket);
}
} catch (IllegalStateException e) {
logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage());
logger.error("invalid code for websocket: {} {}", e.getClass(), e.getMessage());
}
}
}
Expand All @@ -181,7 +193,7 @@ static void closeWebSocket(WebSocket webSocket) {
public void close() {
logger.debug("Force closing the watch {}", this);
closeEvent();
runner.close();
closeRequest();
cancelReconnect();
}

Expand All @@ -190,19 +202,90 @@ interface RequestBuilder {
Request build(final String resourceVersion);
}

abstract static class ClientRunner {
private final OkHttpClient client;
abstract void run(Request request);

protected ClientRunner(OkHttpClient client) {
this.client = cloneAndCustomize(client);
protected void closeRequest() {

}

public void waitUntilReady() {

}

protected WatchEvent readWatchEvent(String messageSource) {
WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class);
KubernetesResource object = null;
if (event != null) {
object = event.getObject();
}
// when watching API Groups we don't get a WatchEvent resource
// so the object will be null
// so lets try parse the message as a KubernetesResource
// as it will probably be a list of resources like a BuildList
if (object == null) {
object = Serialization.unmarshal(messageSource, KubernetesResource.class);
if (event == null) {
event = new WatchEvent(object, "MODIFIED");
} else {
event.setObject(object);
}
}
if (event.getType() == null) {
event.setType("MODIFIED");
}
return event;
}

abstract void run(Request request);
void close() {}
void waitUntilReady() {}
abstract OkHttpClient cloneAndCustomize(OkHttpClient client);
OkHttpClient client() {
return client;
protected void onMessage(String message) {
try {
WatchEvent event = readWatchEvent(message);
Object object = event.getObject();
if (object instanceof HasMetadata) {
@SuppressWarnings("unchecked")
T obj = (T) object;
updateResourceVersion(obj.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
eventReceived(action, obj);
} else if (object instanceof KubernetesResourceList) {
// Dirty cast - should always be valid though
KubernetesResourceList list = (KubernetesResourceList) object;
updateResourceVersion(list.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
List<HasMetadata> items = list.getItems();
if (items != null) {
for (HasMetadata item : items) {
eventReceived(action, (T) item);
}
}
} else if (object instanceof Status) {
Status status = (Status) object;

onStatus(status);
} else {
logger.error("Unknown message received: {}", message);
}
} catch (ClassCastException e) {
logger.error("Received wrong type of object for watch", e);
} catch (IllegalArgumentException e) {
logger.error("Invalid event type", e);
} catch (Throwable e) {
logger.error("Unhandled exception encountered in watcher event handler", e);
}
}

protected boolean onStatus(Status status) {
// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
closeRequest(); // proactively close the request
// exception
// shut down executor, etc.
close(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
return true;
}

eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status);
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,8 +81,7 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final InputStreamPumper pumper;

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
private final ExecListener listener;

private final AtomicBoolean explicitlyClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -185,7 +184,7 @@ private void closeWebSocketOnce(int code, String reason) {
}

public void waitUntilReady() {
Utils.waitUntilReady(queue, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
Utils.waitUntilReadyOrFail(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -207,11 +206,10 @@ public void onOpen(WebSocket webSocket, Response response) {
webSocketRef.set(webSocket);
if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(null);
}
} catch (IOException e) {
queue.add(new KubernetesClientException(OperationSupport.createStatus(response)));
startedFuture.completeExceptionally(new KubernetesClientException(OperationSupport.createStatus(response)));
} finally {
if (listener != null) {
listener.onOpen(response);
Expand All @@ -234,10 +232,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
try {
Status status = OperationSupport.createStatus(response);
LOGGER.error("Exec Failure: HTTP:" + status.getCode() + ". Message:" + status.getMessage(), t);
//We only need to queue startup failures.
if (!started.get()) {
queue.add(new KubernetesClientException(status));
}
startedFuture.completeExceptionally(new KubernetesClientException(status));

cleanUpOnce();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.io.PipedOutputStream;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -53,8 +53,7 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {
private final PipedInputStream output;
private final Set<Closeable> toClose = new LinkedHashSet<>();

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean closed = new AtomicBoolean(false);

Expand Down Expand Up @@ -116,7 +115,7 @@ private void cleanUp() {
}

public void waitUntilReady() {
if (!Utils.waitUntilReady(queue, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis.");
}
Expand All @@ -136,10 +135,7 @@ public void onFailure(Call call, IOException ioe) {

LOGGER.error("Log Callback Failure.", ioe);
cleanUp();
//We only need to queue startup failures.
if (!started.get()) {
queue.add(ioe);
}
startedFuture.completeExceptionally(ioe);
}

@Override
Expand All @@ -157,8 +153,7 @@ public void onResponse(Call call, final Response response) throws IOException {

if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(null);
}

}
Expand Down
Loading

0 comments on commit 5c6ef3e

Please sign in to comment.