Skip to content

Commit

Permalink
Fix fabric8io#3721: Add support for uploading file via InputStream
Browse files Browse the repository at this point in the history
Refactor PodUpload.upload implementation to expose a method which will
recieve InputStream which can be reused by newly added
upload(InputStream) method

Signed-off-by: Rohan Kumar <[email protected]>
  • Loading branch information
rohanKanojia committed Jan 7, 2022
1 parent 69bf84e commit 8241789
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#### Dependency Upgrade

#### New Features
* Fix #3721: Add support for uploading file via InputStream

#### _**Note**_: Breaking changes in the API

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,24 @@
*/
package io.fabric8.kubernetes.client.dsl;

import java.io.InputStream;
import java.nio.file.Path;

public interface Uploadable<T> {

/**
* Upload file located at specified {@link Path} to Pod
*
* @param path path of the file which needs to be uploaded
* @return boolean value regarding upload was successful or not.
*/
T upload(Path path);

/**
* Upload file extracted from provided InputStream to Pod
*
* @param inputStream {@link InputStream} which will be uploaded
* @return boolean value regarding upload was successful or not.
*/
T upload(InputStream inputStream);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.URL;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -68,6 +69,7 @@
import io.fabric8.kubernetes.client.dsl.internal.LogWatchCallback;
import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext;
import io.fabric8.kubernetes.client.utils.Base64;
import io.fabric8.kubernetes.client.utils.IOHelpers;
import io.fabric8.kubernetes.client.utils.PodOperationUtil;
import io.fabric8.kubernetes.client.dsl.internal.PortForwarderWebsocket;
import io.fabric8.kubernetes.client.dsl.internal.uploadable.PodUpload;
Expand Down Expand Up @@ -379,6 +381,18 @@ public Boolean copy(Path destination) {
}
}

@Override
public Boolean upload(InputStream inputStream) {
return wrapRunWithOptionalDependency(() -> {
try {
return PodUpload.uploadFile(httpClient, getContext(), this, inputStream);
} catch (Exception ex) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(ex);
}
}, "TarArchiveOutputStream is provided by commons-compress");
}

@Override
public Boolean upload(Path path) {
return wrapRunWithOptionalDependency(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,17 @@ public static boolean upload(HttpClient client, PodOperationContext context,
throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)");
}

private static boolean uploadFile(HttpClient client, PodOperationContext context,
OperationSupport operationSupport, Path pathToUpload)
public static boolean uploadFile(HttpClient client, PodOperationContext context,
OperationSupport operationSupport, InputStream inputStream)
throws IOException, InterruptedException {

final String file = context.getFile();
final String directory = file.substring(0, file.lastIndexOf('/'));
final String command = String.format(
"mkdir -p %s && base64 -d - > %s", shellQuote(directory), shellQuote(file));
final PodUploadWebSocketListener podUploadWebSocketListener = initWebSocket(
buildCommandUrl(command, context, operationSupport), client);
try (
final FileInputStream fis = new FileInputStream(pathToUpload.toFile());
final Base64.InputStream b64In = new Base64.InputStream(fis, Base64.ENCODE)
final Base64.InputStream b64In = new Base64.InputStream(inputStream, Base64.ENCODE)
) {
podUploadWebSocketListener.waitUntilReady(operationSupport.getConfig().getRequestConfig().getUploadConnectionTimeout());
copy(b64In, podUploadWebSocketListener::send);
Expand All @@ -91,6 +89,14 @@ private static boolean uploadFile(HttpClient client, PodOperationContext context
}
}

private static boolean uploadFile(HttpClient client, PodOperationContext context,
OperationSupport operationSupport, Path pathToUpload)
throws IOException, InterruptedException {
try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) {
return uploadFile(client, context, operationSupport, fis);
}
}

private static boolean uploadDirectory(HttpClient client, PodOperationContext context,
OperationSupport operationSupport, Path pathToUpload)
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.ObjIntConsumer;

import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
Expand All @@ -40,12 +43,15 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -58,6 +64,11 @@ class PodUploadTest {
private Path mockPathToUpload;
private WebSocket mockWebSocket;

@FunctionalInterface
public interface PodUploadTester<R> {
R apply() throws IOException, InterruptedException;
}

@BeforeEach
void setUp() throws IOException {
mockClient = Mockito.mock(HttpClient.class, Mockito.RETURNS_DEEP_STUBS);
Expand Down Expand Up @@ -89,34 +100,17 @@ void testUploadInvalidParametersShouldThrowException() throws Exception {
}

@Test
void testUploadFileHappyScenarioShouldUploadFile() throws Exception {
when(mockContext.getFile()).thenReturn("/mock/dir/file");
void testUploadFileHappyScenarioShouldUploadFile() throws IOException, InterruptedException {
when(mockPathToUpload.toFile())
.thenReturn(new File(PodUpload.class.getResource("/upload/upload-sample.txt").getFile()));
WebSocket.Builder builder = Mockito.mock(WebSocket.Builder.class, Mockito.RETURNS_SELF);
when(builder.buildAsync(any())).thenAnswer(newWebSocket -> {
final PodUploadWebSocketListener wsl = newWebSocket.getArgument(0, PodUploadWebSocketListener.class);
// Set ready status
wsl.onOpen(mockWebSocket);
wsl.onMessage(mockWebSocket, ByteBuffer.wrap(new byte[] {(byte) 0}));
// Set complete status
Mockito.doAnswer(close -> {
wsl.onClose(mockWebSocket, close.getArgument(0), close.getArgument(1));
return null;
}).when(mockWebSocket).sendClose(anyInt(), anyString());
return CompletableFuture.completedFuture(mockWebSocket);
});
when(mockClient.newWebSocketBuilder()).thenReturn(builder);

final boolean result = PodUpload.upload(mockClient, mockContext, operationSupport, mockPathToUpload);

assertThat(result, equalTo(true));
uploadFileAndVerify(() -> PodUpload.upload(mockClient, mockContext, operationSupport, mockPathToUpload));
verify(mockPathToUpload, atLeast(1)).toFile();
verify(builder, times(1)).uri(argThat(request -> {
assertThat(request.toString(), equalTo("https://openshift.com:8443/api/v1/namespaces/default/pods/mock-pod/exec?command=sh&command=-c&command=mkdir+-p+%27%2Fmock%2Fdir%27+%26%26+base64+-d+-+%3E+%27%2Fmock%2Fdir%2Ffile%27&stdin=true&stderr=true"));
return true;
}));
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}

@Test
void testUploadFileInputStreamScenarioShouldUploadFile() throws IOException, InterruptedException {
InputStream inputStream = new ByteArrayInputStream("test data".getBytes());
uploadFileAndVerify(() -> PodUpload.uploadFile(mockClient, mockContext, operationSupport, inputStream));
}

@Test
Expand Down Expand Up @@ -171,4 +165,31 @@ void testCopy() throws Exception {
PodUpload.copy(input, consumer);
}

void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest) throws IOException, InterruptedException {
when(mockContext.getFile()).thenReturn("/mock/dir/file");
WebSocket.Builder builder = Mockito.mock(WebSocket.Builder.class, Mockito.RETURNS_SELF);
when(builder.buildAsync(any())).thenAnswer(newWebSocket -> {
final PodUploadWebSocketListener wsl = newWebSocket.getArgument(0, PodUploadWebSocketListener.class);
// Set ready status
wsl.onOpen(mockWebSocket);
wsl.onMessage(mockWebSocket, ByteBuffer.wrap(new byte[]{(byte) 0}));
// Set complete status
Mockito.doAnswer(close -> {
wsl.onClose(mockWebSocket, close.getArgument(0), close.getArgument(1));
return null;
}).when(mockWebSocket).sendClose(anyInt(), anyString());
return CompletableFuture.completedFuture(mockWebSocket);
});
when(mockClient.newWebSocketBuilder()).thenReturn(builder);

final boolean result = fileUploadMethodToTest.apply();

assertThat(result, equalTo(true));
verify(builder, times(1)).uri(argThat(request -> {
assertThat(request.toString(), equalTo("https://openshift.com:8443/api/v1/namespaces/default/pods/mock-pod/exec?command=sh&command=-c&command=mkdir+-p+%27%2Fmock%2Fdir%27+%26%26+base64+-d+-+%3E+%27%2Fmock%2Fdir%2Ffile%27&stdin=true&stderr=true"));
return true;
}));
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}

}

0 comments on commit 8241789

Please sign in to comment.