Skip to content

Commit

Permalink
refactoring to remove some of the code around pumpers
Browse files Browse the repository at this point in the history
this also helps clarify / remove calls to the utils
shutdownExecutorService
  • Loading branch information
shawkins committed May 25, 2021
1 parent 0f1e7bf commit aef6407
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 343 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.NonBlockingInputStreamPumper;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.Response;
import okhttp3.WebSocket;
Expand All @@ -46,7 +45,6 @@
import java.util.concurrent.atomic.AtomicReference;

import static io.fabric8.kubernetes.client.utils.Utils.closeQuietly;
import static io.fabric8.kubernetes.client.utils.Utils.shutdownExecutorService;

/**
* A {@link WebSocketListener} for exec operations.
Expand Down Expand Up @@ -79,7 +77,6 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc

private final AtomicReference<WebSocket> webSocketRef = new AtomicReference<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final InputStreamPumper pumper;

private final AtomicBoolean started = new AtomicBoolean(false);
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
Expand All @@ -95,21 +92,6 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc

private ObjectMapper objectMapper;

@Deprecated
public ExecWebSocketListener(InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) {
this(new Config(), in, out, err, inputPipe, outputPipe, errorPipe, listener);
}

@Deprecated
public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) {
this(config, in, out, err, null, inputPipe, outputPipe, errorPipe, null, listener, null);
}

@Deprecated
public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe, ExecListener listener) {
this(config, in, out, err, errChannel, inputPipe, outputPipe, errorPipe, errorChannelPipe, listener, null);
}

public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe, ExecListener listener, Integer bufferSize) {
this.config = config;
this.listener = listener;
Expand All @@ -122,13 +104,6 @@ public ExecWebSocketListener(Config config, InputStream in, OutputStream out, Ou
this.output = outputPipe;
this.error = errorPipe;
this.errorChannel = errorChannelPipe;
this.pumper = new NonBlockingInputStreamPumper(this.in, data -> {
try {
send(data);
} catch (Exception e) {
//
}
});
this.objectMapper = new ObjectMapper();
}

Expand All @@ -150,7 +125,7 @@ private void close(WebSocket ws, int code, String reason) {

/**
* Performs the cleanup tasks:
* 1. closes the InputStream pumper
* 1. cancels the InputStream pumper
* 2. closes all internally managed closeables (piped streams).
*
* The order of these tasks can't change or its likely that the pumper will throw errors,
Expand All @@ -161,12 +136,8 @@ private void cleanUpOnce() {
return;
}

try {
closeQuietly(pumper);
shutdownExecutorService(executorService);
} finally {
closeQuietly(toClose);
}
executorService.shutdownNow();
closeQuietly(toClose);
}

private void closeWebSocketOnce(int code, String reason) {
Expand Down Expand Up @@ -206,7 +177,8 @@ public void onOpen(WebSocket webSocket, Response response) {

webSocketRef.set(webSocket);
if (!executorService.isShutdown()) {
executorService.submit(pumper);
// the task will be cancelled via shutdownNow
InputStreamPumper.pump(InputStreamPumper.asNonBlocking(in), this::send, executorService);
started.set(true);
queue.add(true);
}
Expand Down Expand Up @@ -326,13 +298,13 @@ public void resize(int cols, int rows) {
map.put(HEIGHT, rows);
map.put(WIDTH, cols);
byte[] bytes = objectMapper.writeValueAsBytes(map);
send(bytes, (byte) 4);
send(bytes, 0, bytes.length, (byte) 4);
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
}

private void send(byte[] bytes,byte flag) throws IOException {
private void send(byte[] bytes, int offset, int length, byte flag) throws IOException {
if (bytes.length > 0) {
WebSocket ws = webSocketRef.get();
if (ws != null) {
Expand All @@ -344,8 +316,8 @@ private void send(byte[] bytes,byte flag) throws IOException {
}
}

private void send(byte[] bytes) throws IOException {
send(bytes,(byte)0);
private void send(byte[] bytes, int offset, int length) throws IOException {
send(bytes, offset, length, (byte)0);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.utils.BlockingInputStreamPumper;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.Call;
Expand All @@ -36,13 +35,13 @@
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.fabric8.kubernetes.client.utils.Utils.closeQuietly;
import static io.fabric8.kubernetes.client.utils.Utils.shutdownExecutorService;

public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {

Expand All @@ -58,8 +57,6 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean closed = new AtomicBoolean(false);

private InputStreamPumper pumper;

@Deprecated
public LogWatchCallback(OutputStream out) {
this(new Config(), out);
Expand Down Expand Up @@ -96,23 +93,19 @@ public void close() {

/**
* Performs the cleanup tasks:
* 1. closes the InputStream pumper
* 1. cancels the InputStream pumper
* 2. closes all internally managed closeables (piped streams).
*
* The order of these tasks can't change or its likely that the pumper will through errors,
* if the stream it uses closes before the pumper it self.
*/
private void cleanUp() {
try {
if (!closed.compareAndSet(false, true)) {
return;
}

closeQuietly(pumper);
shutdownExecutorService(executorService);
} finally {
closeQuietly(toClose);
if (!closed.compareAndSet(false, true)) {
return;
}

executorService.shutdownNow();
closeQuietly(toClose);
}

public void waitUntilReady() {
Expand Down Expand Up @@ -144,19 +137,12 @@ public void onFailure(Call call, IOException ioe) {

@Override
public void onResponse(Call call, final Response response) throws IOException {
pumper = new BlockingInputStreamPumper(response.body().byteStream(), input -> {
try {
out.write(input);
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}, () -> {
cleanUp();
response.close();
});

if (!executorService.isShutdown()) {
executorService.submit(pumper);
// the task will be cancelled via shutdownNow
InputStreamPumper.pump(response.body().byteStream(), out::write, executorService).whenComplete((o, t) -> {
cleanUp();
response.close();
});
started.set(true);
queue.add(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static io.fabric8.kubernetes.client.utils.OptionalDependencyWrapper.wrapRunWithOptionalDependency;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -29,7 +28,9 @@
import java.net.URL;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -41,7 +42,6 @@
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.policy.v1beta1.Eviction;
import io.fabric8.kubernetes.api.model.policy.v1beta1.EvictionBuilder;
import io.fabric8.kubernetes.client.Callback;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.LocalPortForward;
Expand Down Expand Up @@ -71,7 +71,6 @@
import io.fabric8.kubernetes.client.utils.PodOperationUtil;
import io.fabric8.kubernetes.client.dsl.internal.PortForwarderWebsocket;
import io.fabric8.kubernetes.client.dsl.internal.uploadable.PodUpload;
import io.fabric8.kubernetes.client.utils.BlockingInputStreamPumper;
import io.fabric8.kubernetes.client.utils.URLUtils;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.api.builder.VisitableBuilder;
Expand Down Expand Up @@ -474,22 +473,8 @@ public void run() {
String filename = parts[parts.length - 1];
destination = destination.toPath().resolve(filename).toFile();
}
try (InputStream is = readFile(source);
OutputStream os = new FileOutputStream(destination)) {
BlockingInputStreamPumper pumper = new BlockingInputStreamPumper(is, input -> {
try {
os.write(input);
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}, () -> {
try {
os.flush();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
});
pumper.run();
try (InputStream is = readFile(source);) {
Files.copy(is, destination.toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
Expand Down Expand Up @@ -547,6 +532,7 @@ private void copyDir(String source, File target) throws Exception {
//Let's wrap the code to a runnable inner class to avoid NoClassDef on Option classes.
try {
new Runnable() {

public void run() {
File destination = target;
if (!destination.isDirectory() && !destination.mkdirs())
Expand Down Expand Up @@ -575,24 +561,10 @@ public void run() {
if (!parent.isDirectory() && !parent.mkdirs()) {
throw new IOException("Failed to create directory: " + f);
}
try (OutputStream fs = new FileOutputStream(f)) {
BlockingInputStreamPumper pumper = new BlockingInputStreamPumper(tis, new Callback<byte[]>() {
@Override
public void call(byte[] input) {
try {
fs.write(input);
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}
}, () -> {
try {
fs.close();
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
});
pumper.run();
try {
Files.copy(tis, f.toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}
}
Expand Down

This file was deleted.

Loading

0 comments on commit aef6407

Please sign in to comment.