Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring to remove some of the code around pumpers #3169

Merged
merged 2 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@

##### Util Changes:
- #3197 `Utils.waitUntilReady` now accepts a Future, rather than a BlockingQueue
- #3169 `Utils.shutdownExecutorService` removed in favor of direct usage of shutdownNow where appropriate.
The stream pumper related classes were also simplified to utility methods on InputStreamPumper.

### 5.4.1 (2021-06-01)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public void close() {
connectionPool.evictAll();
}

Utils.shutdownExecutorService(executorService);
if (executorService != null) {
executorService.shutdownNow();
}
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.NonBlockingInputStreamPumper;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.Response;
import okhttp3.WebSocket;
Expand All @@ -46,7 +45,6 @@
import java.util.concurrent.atomic.AtomicReference;

import static io.fabric8.kubernetes.client.utils.Utils.closeQuietly;
import static io.fabric8.kubernetes.client.utils.Utils.shutdownExecutorService;

/**
* A {@link WebSocketListener} for exec operations.
Expand Down Expand Up @@ -79,7 +77,6 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc

private final AtomicReference<WebSocket> webSocketRef = new AtomicReference<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final InputStreamPumper pumper;

private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
private final ExecListener listener;
Expand All @@ -94,21 +91,6 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc

private ObjectMapper objectMapper;

@Deprecated
public ExecWebSocketListener(InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) {
this(new Config(), in, out, err, inputPipe, outputPipe, errorPipe, listener);
}

@Deprecated
public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) {
this(config, in, out, err, null, inputPipe, outputPipe, errorPipe, null, listener, null);
}

@Deprecated
public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe, ExecListener listener) {
this(config, in, out, err, errChannel, inputPipe, outputPipe, errorPipe, errorChannelPipe, listener, null);
}

public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe, ExecListener listener, Integer bufferSize) {
this.config = config;
this.listener = listener;
Expand All @@ -121,13 +103,6 @@ public ExecWebSocketListener(Config config, InputStream in, OutputStream out, Ou
this.output = outputPipe;
this.error = errorPipe;
this.errorChannel = errorChannelPipe;
this.pumper = new NonBlockingInputStreamPumper(this.in, data -> {
try {
send(data);
} catch (Exception e) {
//
}
});
this.objectMapper = new ObjectMapper();
}

Expand All @@ -149,7 +124,7 @@ private void close(WebSocket ws, int code, String reason) {

/**
* Performs the cleanup tasks:
* 1. closes the InputStream pumper
* 1. cancels the InputStream pumper
* 2. closes all internally managed closeables (piped streams).
*
* The order of these tasks can't change or its likely that the pumper will throw errors,
Expand All @@ -160,12 +135,8 @@ private void cleanUpOnce() {
return;
}

try {
closeQuietly(pumper);
shutdownExecutorService(executorService);
} finally {
closeQuietly(toClose);
}
executorService.shutdownNow();
closeQuietly(toClose);
}

private void closeWebSocketOnce(int code, String reason) {
Expand Down Expand Up @@ -205,7 +176,8 @@ public void onOpen(WebSocket webSocket, Response response) {

webSocketRef.set(webSocket);
if (!executorService.isShutdown()) {
executorService.submit(pumper);
// the task will be cancelled via shutdownNow
InputStreamPumper.pump(InputStreamPumper.asInterruptible(in), this::send, executorService);
startedFuture.complete(null);
}
} catch (IOException e) {
Expand Down Expand Up @@ -321,26 +293,26 @@ public void resize(int cols, int rows) {
map.put(HEIGHT, rows);
map.put(WIDTH, cols);
byte[] bytes = objectMapper.writeValueAsBytes(map);
send(bytes, (byte) 4);
send(bytes, 0, bytes.length, (byte) 4);
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
}

private void send(byte[] bytes,byte flag) throws IOException {
if (bytes.length > 0) {
private void send(byte[] bytes, int offset, int length, byte flag) {
if (length > 0) {
WebSocket ws = webSocketRef.get();
if (ws != null) {
byte[] toSend = new byte[bytes.length + 1];
byte[] toSend = new byte[length + 1];
toSend[0] = flag;
System.arraycopy(bytes, 0, toSend, 1, bytes.length);
System.arraycopy(bytes, offset, toSend, 1, length);
ws.send(ByteString.of(toSend));
}
}
}

private void send(byte[] bytes) throws IOException {
send(bytes,(byte)0);
private void send(byte[] bytes, int offset, int length) {
send(bytes, offset, length, (byte)0);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.utils.BlockingInputStreamPumper;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.Call;
Expand All @@ -42,7 +41,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static io.fabric8.kubernetes.client.utils.Utils.closeQuietly;
import static io.fabric8.kubernetes.client.utils.Utils.shutdownExecutorService;

public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {

Expand All @@ -57,8 +55,6 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean closed = new AtomicBoolean(false);

private InputStreamPumper pumper;

@Deprecated
public LogWatchCallback(OutputStream out) {
this(new Config(), out);
Expand Down Expand Up @@ -95,23 +91,19 @@ public void close() {

/**
* Performs the cleanup tasks:
* 1. closes the InputStream pumper
* 1. cancels the InputStream pumper
* 2. closes all internally managed closeables (piped streams).
*
* The order of these tasks can't change or its likely that the pumper will through errors,
* if the stream it uses closes before the pumper it self.
*/
private void cleanUp() {
try {
if (!closed.compareAndSet(false, true)) {
return;
}

closeQuietly(pumper);
shutdownExecutorService(executorService);
} finally {
closeQuietly(toClose);
if (!closed.compareAndSet(false, true)) {
return;
}

executorService.shutdownNow();
closeQuietly(toClose);
}

public void waitUntilReady() {
Expand Down Expand Up @@ -140,19 +132,12 @@ public void onFailure(Call call, IOException ioe) {

@Override
public void onResponse(Call call, final Response response) throws IOException {
pumper = new BlockingInputStreamPumper(response.body().byteStream(), input -> {
try {
out.write(input);
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}, () -> {
cleanUp();
response.close();
});

if (!executorService.isShutdown()) {
executorService.submit(pumper);
// the task will be cancelled via shutdownNow
InputStreamPumper.pump(response.body().byteStream(), out::write, executorService).whenComplete((o, t) -> {
cleanUp();
response.close();
});
startedFuture.complete(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static io.fabric8.kubernetes.client.utils.OptionalDependencyWrapper.wrapRunWithOptionalDependency;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -29,7 +28,9 @@
import java.net.URL;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -41,7 +42,6 @@
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.policy.v1beta1.Eviction;
import io.fabric8.kubernetes.api.model.policy.v1beta1.EvictionBuilder;
import io.fabric8.kubernetes.client.Callback;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.LocalPortForward;
Expand Down Expand Up @@ -71,7 +71,6 @@
import io.fabric8.kubernetes.client.utils.PodOperationUtil;
import io.fabric8.kubernetes.client.dsl.internal.PortForwarderWebsocket;
import io.fabric8.kubernetes.client.dsl.internal.uploadable.PodUpload;
import io.fabric8.kubernetes.client.utils.BlockingInputStreamPumper;
import io.fabric8.kubernetes.client.utils.URLUtils;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.api.builder.VisitableBuilder;
Expand Down Expand Up @@ -474,22 +473,8 @@ public void run() {
String filename = parts[parts.length - 1];
destination = destination.toPath().resolve(filename).toFile();
}
try (InputStream is = readFile(source);
OutputStream os = new FileOutputStream(destination)) {
BlockingInputStreamPumper pumper = new BlockingInputStreamPumper(is, input -> {
try {
os.write(input);
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}, () -> {
try {
os.flush();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
});
pumper.run();
try (InputStream is = readFile(source);) {
Files.copy(is, destination.toPath(), StandardCopyOption.REPLACE_EXISTING);
shawkins marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
Expand Down Expand Up @@ -575,25 +560,7 @@ public void run() {
if (!parent.isDirectory() && !parent.mkdirs()) {
throw new IOException("Failed to create directory: " + f);
}
try (OutputStream fs = new FileOutputStream(f)) {
BlockingInputStreamPumper pumper = new BlockingInputStreamPumper(tis, new Callback<byte[]>() {
@Override
public void call(byte[] input) {
try {
fs.write(input);
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}
}, () -> {
try {
fs.close();
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
});
pumper.run();
}
Files.copy(tis, f.toPath(), StandardCopyOption.REPLACE_EXISTING);
shawkins marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Loading