Skip to content

Commit

Permalink
fixing fabric8io#3001 fabric8io#3186 over logging of websocket except…
Browse files Browse the repository at this point in the history
…ions and closure

this localizes the state of a websocket to the listener, rather than the
runner to have a clearer lifecycle.

it also replaces the use of BlockingQueue with CompletableFuture.
  • Loading branch information
shawkins committed Jun 2, 2021
1 parent 71c1a8c commit 22d7041
Show file tree
Hide file tree
Showing 17 changed files with 175 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ public Watch watch(ListOptions options, final Watcher<T> watcher) {
config.getWatchReconnectLimit(),
config.getWebsocketTimeout()
);
watch.waitUntilReady();
watch.runAndWaitUntilReady();
return watch;
} catch (MalformedURLException e) {
throw KubernetesClientException.launderThrowable(forOperationType("watch"), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ protected void initRunner(ClientRunner runner) {
this.runner = runner;
}

final void closeEvent(WatcherException cause) {
final void close(WatcherException cause) {
if (!watcher.reconnecting() && forceClosed.getAndSet(true)) {
logger.debug("Ignoring duplicate firing of onClose event");
return;
} else {
watcher.onClose(cause);
}
watcher.onClose(cause);
close();
}

final void closeEvent() {
Expand All @@ -94,7 +95,7 @@ final synchronized void cancelReconnect() {
}
}

void scheduleReconnect(Runnable command, boolean shouldBackoff) {
void scheduleReconnect(boolean shouldBackoff) {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
Expand All @@ -109,7 +110,11 @@ void scheduleReconnect(Runnable command, boolean shouldBackoff) {
synchronized (this) {
reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> {
try {
command.run();
runWatch();
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close(new WatcherException("Unhandled exception in reconnect attempt", e));
} finally {
reconnectPending.set(false);
}
Expand Down Expand Up @@ -160,7 +165,8 @@ protected void runWatch() {
runner.run(request);
}

public void waitUntilReady() {
public void runAndWaitUntilReady() {
runWatch();
runner.waitUntilReady();
}

Expand All @@ -169,10 +175,10 @@ static void closeWebSocket(WebSocket webSocket) {
logger.debug("Closing websocket {}", webSocket);
try {
if (!webSocket.close(1000, null)) {
logger.warn("Failed to close websocket");
logger.debug("Websocket already closed {}", webSocket);
}
} catch (IllegalStateException e) {
logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage());
logger.error("invalid code for websocket: {} {}", e.getClass(), e.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,8 +81,7 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final InputStreamPumper pumper;

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
private final ExecListener listener;

private final AtomicBoolean explicitlyClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -185,7 +184,7 @@ private void closeWebSocketOnce(int code, String reason) {
}

public void waitUntilReady() {
Utils.waitUntilReady(queue, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
Utils.waitUntilReadyOrFail(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -207,11 +206,10 @@ public void onOpen(WebSocket webSocket, Response response) {
webSocketRef.set(webSocket);
if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(null);
}
} catch (IOException e) {
queue.add(new KubernetesClientException(OperationSupport.createStatus(response)));
startedFuture.completeExceptionally(new KubernetesClientException(OperationSupport.createStatus(response)));
} finally {
if (listener != null) {
listener.onOpen(response);
Expand All @@ -234,10 +232,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
try {
Status status = OperationSupport.createStatus(response);
LOGGER.error("Exec Failure: HTTP:" + status.getCode() + ". Message:" + status.getMessage(), t);
//We only need to queue startup failures.
if (!started.get()) {
queue.add(new KubernetesClientException(status));
}
startedFuture.completeExceptionally(new KubernetesClientException(status));

cleanUpOnce();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.io.PipedOutputStream;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -53,8 +53,7 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {
private final PipedInputStream output;
private final Set<Closeable> toClose = new LinkedHashSet<>();

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean closed = new AtomicBoolean(false);

Expand Down Expand Up @@ -116,7 +115,7 @@ private void cleanUp() {
}

public void waitUntilReady() {
if (!Utils.waitUntilReady(queue, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis.");
}
Expand All @@ -136,10 +135,7 @@ public void onFailure(Call call, IOException ioe) {

LOGGER.error("Log Callback Failure.", ioe);
cleanUp();
//We only need to queue startup failures.
if (!started.get()) {
queue.add(ioe);
}
startedFuture.completeExceptionally(ioe);
}

@Override
Expand All @@ -157,8 +153,7 @@ public void onResponse(Call call, final Response response) throws IOException {

if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(null);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ public Watch watch(Map<String, String> labels, ListOptions options, Watcher<Stri
getConfig() != null ? getConfig().getWatchReconnectLimit() : -1,
getConfig() != null ? getConfig().getWatchReconnectInterval() : 1000,
5);
watch.waitUntilReady();
watch.runAndWaitUntilReady();
return watch;
} catch (KubernetesClientException ke) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

/**
* This class just replicates WatchConnectionManager in handling watch connections but
Expand All @@ -38,23 +36,22 @@ public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watc

initRunner(new WebSocketClientRunner<String>(okHttpClient) {
@Override
WatcherWebSocketListener<String> newListener(BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef) {
return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, queue, webSocketRef, objectMapper);
WatcherWebSocketListener<String> newListener() {
return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, objectMapper);
}

@Override
OkHttpClient cloneAndCustomize(OkHttpClient client) {
return okHttpClient.newBuilder().build();
}
});
runWatch();
}

private static class RawWatcherWebSocketListener extends WatcherWebSocketListener<String> {
private final ObjectMapper objectMapper;

public RawWatcherWebSocketListener(AbstractWatchManager<String> manager, BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef, ObjectMapper objectMapper) {
super(manager, queue, webSocketRef);
public RawWatcherWebSocketListener(AbstractWatchManager<String> manager, ObjectMapper objectMapper) {
super(manager);
this.objectMapper = objectMapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
Expand All @@ -46,8 +44,8 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation<T,

initRunner(new WebSocketClientRunner<T>(client) {
@Override
WatcherWebSocketListener<T> newListener(BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef) {
return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this, queue, webSocketRef);
WatcherWebSocketListener<T> newListener() {
return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this);
}

@Override
Expand All @@ -57,7 +55,6 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) {
.build();
}
});
runWatch();
}

public WatchConnectionManager(final OkHttpClient client, final BaseOperation<T, L, ?> baseOperation, final ListOptions listOptions, final Watcher<T> watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException {
Expand All @@ -66,8 +63,8 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation<T,
}

private static class TypedWatcherWebSocketListener<T extends HasMetadata> extends WatcherWebSocketListener<T> {
public TypedWatcherWebSocketListener(AbstractWatchManager<T> manager, BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef) {
super(manager, queue, webSocketRef);
public TypedWatcherWebSocketListener(AbstractWatchManager<T> manager) {
super(manager);
}

@Override
Expand Down Expand Up @@ -97,11 +94,10 @@ public void onMessage(WebSocket webSocket, String message) {

// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe
close(); // proactively close the websocket
// exception
// shut down executor, etc.
manager.closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
manager.close();
manager.close(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) {
return clonedClient;
}
});

runWatch();
}

private abstract static class HTTPClientRunner<T extends HasMetadata> extends AbstractWatchManager.ClientRunner {
Expand Down Expand Up @@ -152,16 +150,7 @@ private void scheduleReconnect(boolean shouldBackoff) {
return;
}

manager.scheduleReconnect(() -> {
try {
manager.runWatch();
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
}
}, shouldBackoff);
manager.scheduleReconnect(shouldBackoff);
}

public void onMessage(String messageSource) {
Expand Down
Loading

0 comments on commit 22d7041

Please sign in to comment.