Skip to content

Commit

Permalink
fixing fabric8io#3001 fabric8io#3186 over logging of websocket except…
Browse files Browse the repository at this point in the history
…ions and closure

this localizes the state of a websocket to the listener, rather than the
runner to have a clearer lifecycle.

it also replaces the use of BlockingQueue with CompletableFuture.
  • Loading branch information
shawkins committed May 31, 2021
1 parent 71c1a8c commit c360a08
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ static void closeWebSocket(WebSocket webSocket) {
logger.debug("Closing websocket {}", webSocket);
try {
if (!webSocket.close(1000, null)) {
logger.warn("Failed to close websocket");
logger.debug("Websocket already closed {}", webSocket);
}
} catch (IllegalStateException e) {
logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage());
logger.error("invalid code for websocket: {} {}", e.getClass(), e.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,8 +81,7 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final InputStreamPumper pumper;

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Boolean> startedFuture = new CompletableFuture<>();
private final ExecListener listener;

private final AtomicBoolean explicitlyClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -185,7 +184,7 @@ private void closeWebSocketOnce(int code, String reason) {
}

public void waitUntilReady() {
Utils.waitUntilReady(queue, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
Utils.waitUntilReady(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -207,11 +206,10 @@ public void onOpen(WebSocket webSocket, Response response) {
webSocketRef.set(webSocket);
if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(true);
}
} catch (IOException e) {
queue.add(new KubernetesClientException(OperationSupport.createStatus(response)));
startedFuture.completeExceptionally(new KubernetesClientException(OperationSupport.createStatus(response)));
} finally {
if (listener != null) {
listener.onOpen(response);
Expand All @@ -234,10 +232,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
try {
Status status = OperationSupport.createStatus(response);
LOGGER.error("Exec Failure: HTTP:" + status.getCode() + ". Message:" + status.getMessage(), t);
//We only need to queue startup failures.
if (!started.get()) {
queue.add(new KubernetesClientException(status));
}
startedFuture.completeExceptionally(new KubernetesClientException(status));

cleanUpOnce();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.io.PipedOutputStream;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -53,8 +53,7 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {
private final PipedInputStream output;
private final Set<Closeable> toClose = new LinkedHashSet<>();

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
private final CompletableFuture<Boolean> startedFuture = new CompletableFuture<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean closed = new AtomicBoolean(false);

Expand Down Expand Up @@ -116,7 +115,7 @@ private void cleanUp() {
}

public void waitUntilReady() {
if (!Utils.waitUntilReady(queue, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis.");
}
Expand All @@ -136,10 +135,7 @@ public void onFailure(Call call, IOException ioe) {

LOGGER.error("Log Callback Failure.", ioe);
cleanUp();
//We only need to queue startup failures.
if (!started.get()) {
queue.add(ioe);
}
startedFuture.completeExceptionally(ioe);
}

@Override
Expand All @@ -157,8 +153,7 @@ public void onResponse(Call call, final Response response) throws IOException {

if (!executorService.isShutdown()) {
executorService.submit(pumper);
started.set(true);
queue.add(true);
startedFuture.complete(true);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -38,8 +38,8 @@ public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watc

initRunner(new WebSocketClientRunner<String>(okHttpClient) {
@Override
WatcherWebSocketListener<String> newListener(BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef) {
return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, queue, webSocketRef, objectMapper);
WatcherWebSocketListener<String> newListener() {
return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, objectMapper);
}

@Override
Expand All @@ -53,8 +53,8 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) {
private static class RawWatcherWebSocketListener extends WatcherWebSocketListener<String> {
private final ObjectMapper objectMapper;

public RawWatcherWebSocketListener(AbstractWatchManager<String> manager, BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef, ObjectMapper objectMapper) {
super(manager, queue, webSocketRef);
public RawWatcherWebSocketListener(AbstractWatchManager<String> manager, ObjectMapper objectMapper) {
super(manager);
this.objectMapper = objectMapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
Expand All @@ -46,8 +44,8 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation<T,

initRunner(new WebSocketClientRunner<T>(client) {
@Override
WatcherWebSocketListener<T> newListener(BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef) {
return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this, queue, webSocketRef);
WatcherWebSocketListener<T> newListener() {
return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this);
}

@Override
Expand All @@ -66,8 +64,8 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation<T,
}

private static class TypedWatcherWebSocketListener<T extends HasMetadata> extends WatcherWebSocketListener<T> {
public TypedWatcherWebSocketListener(AbstractWatchManager<T> manager, BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef) {
super(manager, queue, webSocketRef);
public TypedWatcherWebSocketListener(AbstractWatchManager<T> manager) {
super(manager);
}

@Override
Expand Down Expand Up @@ -97,7 +95,7 @@ public void onMessage(WebSocket webSocket, String message) {

// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe
close();
// exception
// shut down executor, etc.
manager.closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,35 @@
*/
package io.fabric8.kubernetes.client.dsl.internal;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;

abstract class WatcherWebSocketListener<T> extends WebSocketListener {
protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class);

protected final AtomicReference<WebSocket> webSocketRef;
/**
* True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started.
*/
private final AtomicBoolean started = new AtomicBoolean(false);
protected final AtomicReference<WebSocket> webSocketRef = new AtomicReference<>();

/**
* Blocking queue for startup exceptions.
*/
private final BlockingQueue<Object> queue;
private final CompletableFuture<Boolean> startedFuture = new CompletableFuture<>();
protected final AbstractWatchManager<T> manager;

protected WatcherWebSocketListener(AbstractWatchManager<T> manager, BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef) {
protected WatcherWebSocketListener(AbstractWatchManager<T> manager) {
this.manager = manager;
this.queue = queue;
this.webSocketRef = webSocketRef;
}

@Override
Expand All @@ -62,9 +54,10 @@ public void onOpen(final WebSocket webSocket, Response response) {
logger.debug("WebSocket successfully opened");
webSocketRef.set(webSocket);
manager.resetReconnectAttempts();
started.set(true);
queue.clear();
queue.add(true);
startedFuture.complete(true);
if (manager.isForceClosed()) {
close();
}
}

@Override
Expand All @@ -88,19 +81,14 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
closeBody(response);
return;
} else {
// We only need to queue startup failures.
Status status = OperationSupport.createStatus(response);
closeBody(response);
logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage(), t);
if (!started.get()) {
pushException(new KubernetesClientException(status));
}
logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage());
pushException(new KubernetesClientException(status));
}
} else {
logger.warn("Exec Failure", t);
if (!started.get()) {
pushException(new KubernetesClientException("Failed to start websocket", t));
}
logger.warn("Exec Failure {} {}", t.getClass().getName(), t.getMessage());
pushException(new KubernetesClientException("Failed to start websocket", t));
}

if (manager.cannotReconnect()) {
Expand All @@ -112,9 +100,8 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
}

private void pushException(KubernetesClientException exception) {
queue.clear();
if (!queue.offer(exception)) {
logger.debug("Couldn't add exception {} to queue", exception.getLocalizedMessage());
if (!startedFuture.completeExceptionally(exception)) {
logger.debug("Couldn't report exception", exception);
}
}

Expand Down Expand Up @@ -157,10 +144,18 @@ private void scheduleReconnect() {
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e));
manager.close();
}
}, true);
}

protected void close() {
startedFuture.completeExceptionally(new IllegalStateException("already closed"));
AbstractWatchManager.closeWebSocket(webSocketRef.getAndSet(null));
}

protected void waitUntilReady() {
Utils.waitUntilReady(startedFuture, 10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,43 @@
*/
package io.fabric8.kubernetes.client.dsl.internal;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;

abstract class WebSocketClientRunner<T> extends AbstractWatchManager.ClientRunner {
private final AtomicReference<WebSocket> webSocketRef = new AtomicReference<>();
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);

protected WatcherWebSocketListener<T> listener;

protected WebSocketClientRunner(OkHttpClient client) {
super(client);
}

@Override
public void run(Request request) {
client().newWebSocket(request, newListener(queue, webSocketRef));
public synchronized void run(Request request) {
close();
this.listener = newListener();
client().newWebSocket(request, this.listener);
}
abstract WatcherWebSocketListener<T> newListener(BlockingQueue<Object> queue, AtomicReference<WebSocket> webSocketRef);

abstract WatcherWebSocketListener<T> newListener();

@Override
public void close() {
AbstractWatchManager.closeWebSocket(webSocketRef.getAndSet(null));
public synchronized void close() {
if (this.listener != null) {
listener.close();
}
}

@Override
public void waitUntilReady() {
Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS);
WatcherWebSocketListener<T> current = getListener();
if (current == null) {
throw new IllegalStateException();
}
current.waitUntilReady();
}

synchronized WatcherWebSocketListener<T> getListener() {
return listener;
}
}
Loading

0 comments on commit c360a08

Please sign in to comment.