Skip to content

Commit

Permalink
mgmt, fix issues on appservice test (Azure#15761)
Browse files Browse the repository at this point in the history
  • Loading branch information
weidongxu-microsoft authored Sep 27, 2020
1 parent 56d2b47 commit e4229fb
Show file tree
Hide file tree
Showing 32 changed files with 230 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public void warDeploy(File warFile) {
}

@Override
public Mono<Void> warDeployAsync(InputStream warFile) {
return warDeployAsync(warFile, null);
public Mono<Void> warDeployAsync(InputStream warFile, long length) {
return warDeployAsync(warFile, length, null);
}

@Override
public void warDeploy(InputStream warFile) {
warDeployAsync(warFile).block();
public void warDeploy(InputStream warFile, long length) {
warDeployAsync(warFile, length).block();
}

@Override
Expand All @@ -80,13 +80,13 @@ public void warDeploy(File warFile, String appName) {
}

@Override
public Mono<Void> warDeployAsync(InputStream warFile, String appName) {
return kuduClient.warDeployAsync(warFile, appName);
public Mono<Void> warDeployAsync(InputStream warFile, long length, String appName) {
return kuduClient.warDeployAsync(warFile, length, appName);
}

@Override
public void warDeploy(InputStream warFile, String appName) {
warDeployAsync(warFile, appName).block();
public void warDeploy(InputStream warFile, long length, String appName) {
warDeployAsync(warFile, length, appName).block();
}

@Override
Expand All @@ -95,13 +95,13 @@ public void zipDeploy(File zipFile) {
}

@Override
public void zipDeploy(InputStream zipFile) {
zipDeployAsync(zipFile).block();
public void zipDeploy(InputStream zipFile, long length) {
zipDeployAsync(zipFile, length).block();
}

@Override
public Mono<Void> zipDeployAsync(InputStream zipFile) {
return kuduClient.zipDeployAsync(zipFile).then(stopAsync()).then(startAsync());
public Mono<Void> zipDeployAsync(InputStream zipFile, long length) {
return kuduClient.zipDeployAsync(zipFile, length).then(stopAsync()).then(startAsync());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,13 +582,13 @@ public void zipDeploy(File zipFile) {
}

@Override
public Mono<Void> zipDeployAsync(InputStream zipFile) {
return kuduClient.zipDeployAsync(zipFile);
public Mono<Void> zipDeployAsync(InputStream zipFile, long length) {
return kuduClient.zipDeployAsync(zipFile, length);
}

@Override
public void zipDeploy(InputStream zipFile) {
zipDeployAsync(zipFile).block();
public void zipDeploy(InputStream zipFile, long length) {
zipDeployAsync(zipFile, length).block();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public void zipDeploy(File zipFile) {
}

@Override
public void zipDeploy(InputStream zipFile) {
zipDeployAsync(zipFile).block();
public void zipDeploy(InputStream zipFile, long length) {
zipDeployAsync(zipFile, length).block();
}

@Override
public Mono<Void> zipDeployAsync(InputStream zipFile) {
return kuduClient.zipDeployAsync(zipFile);
public Mono<Void> zipDeployAsync(InputStream zipFile, long length) {
return kuduClient.zipDeployAsync(zipFile, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import com.azure.resourcemanager.resources.fluentcore.policy.ProviderRegistrationPolicy;
import com.fasterxml.jackson.core.JsonParseException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand All @@ -46,6 +44,7 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/** A client which interacts with Kudu service. */
class KuduClient {
Expand Down Expand Up @@ -123,16 +122,16 @@ private interface KuduService {
@Get("api/logstream")
Mono<StreamResponse> streamAllLogs(@HostParam("$host") String host);

@Headers({
"Content-Type: application/octet-stream",
"x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps warDeploy",
"x-ms-body-logging: false"
})
@Post("api/wardeploy")
Mono<Void> warDeploy(
@HostParam("$host") String host,
@BodyParam("application/octet-stream") byte[] warFile,
@QueryParam("name") String appName);
// @Headers({
// "Content-Type: application/octet-stream",
// "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps warDeploy",
// "x-ms-body-logging: false"
// })
// @Post("api/wardeploy")
// Mono<Void> warDeploy(
// @HostParam("$host") String host,
// @BodyParam("application/octet-stream") byte[] warFile,
// @QueryParam("name") String appName);

@Headers({
"Content-Type: application/octet-stream",
Expand All @@ -146,15 +145,15 @@ Mono<Void> warDeploy(
@HeaderParam("content-length") long size,
@QueryParam("name") String appName);

@Headers({
"Content-Type: application/octet-stream",
"x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps zipDeploy",
"x-ms-body-logging: false"
})
@Post("api/zipdeploy")
Mono<Void> zipDeploy(
@HostParam("$host") String host,
@BodyParam("application/octet-stream") byte[] zipFile);
// @Headers({
// "Content-Type: application/octet-stream",
// "x-ms-logging-context: com.microsoft.azure.management.appservice.WebApps zipDeploy",
// "x-ms-body-logging: false"
// })
// @Post("api/zipdeploy")
// Mono<Void> zipDeploy(
// @HostParam("$host") String host,
// @BodyParam("application/octet-stream") byte[] zipFile);

@Headers({
"Content-Type: application/octet-stream",
Expand Down Expand Up @@ -256,13 +255,18 @@ private static int findByte(ByteBuffer byteBuffer, byte b) {
return index;
}

Mono<Void> warDeployAsync(InputStream warFile, String appName) {
InputStreamFlux flux = fluxFromInputStream(warFile);
if (flux.flux != null) {
return withRetry(service.warDeploy(host, flux.flux, flux.size, appName));
} else {
return withRetry(service.warDeploy(host, flux.bytes, appName));
}
// Mono<Void> warDeployAsync(InputStream warFile, String appName) {
// InputStreamFlux flux = fluxFromInputStream(warFile);
// if (flux.flux != null) {
// return withRetry(service.warDeploy(host, flux.flux, flux.size, appName));
// } else {
// return withRetry(service.warDeploy(host, flux.bytes, appName));
// }
// }

Mono<Void> warDeployAsync(InputStream warFile, long length, String appName) {
Flux<ByteBuffer> flux = FluxUtil.toFluxByteBuffer(warFile);
return withRetry(service.warDeploy(host, flux, length, appName));
}

Mono<Void> warDeployAsync(File warFile, String appName) throws IOException {
Expand All @@ -277,13 +281,18 @@ Mono<Void> warDeployAsync(File warFile, String appName) throws IOException {
}));
}

Mono<Void> zipDeployAsync(InputStream zipFile) {
InputStreamFlux flux = fluxFromInputStream(zipFile);
if (flux.flux != null) {
return withRetry(service.zipDeploy(host, flux.flux, flux.size));
} else {
return withRetry(service.zipDeploy(host, flux.bytes));
}
// Mono<Void> zipDeployAsync(InputStream zipFile) {
// InputStreamFlux flux = fluxFromInputStream(zipFile);
// if (flux.flux != null) {
// return withRetry(service.zipDeploy(host, flux.flux, flux.size));
// } else {
// return withRetry(service.zipDeploy(host, flux.bytes));
// }
// }

Mono<Void> zipDeployAsync(InputStream zipFile, long length) {
Flux<ByteBuffer> flux = FluxUtil.toFluxByteBuffer(zipFile);
return withRetry(service.zipDeploy(host, flux, length));
}

Mono<Void> zipDeployAsync(File zipFile) throws IOException {
Expand All @@ -298,40 +307,40 @@ Mono<Void> zipDeployAsync(File zipFile) throws IOException {
}));
}

private InputStreamFlux fluxFromInputStream(InputStream inputStream) {
try {
InputStreamFlux inputStreamFlux = new InputStreamFlux();
if (inputStream instanceof FileInputStream) {
inputStreamFlux.size = ((FileInputStream) inputStream).getChannel().size();
inputStreamFlux.flux = FluxUtil.toFluxByteBuffer(inputStream);
} else if (inputStream instanceof ByteArrayInputStream) {
inputStreamFlux.size = inputStream.available();
inputStreamFlux.flux = FluxUtil.toFluxByteBuffer(inputStream);
} else {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] data = new byte[16384];
while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
inputStreamFlux.bytes = buffer.toByteArray();
inputStreamFlux.size = inputStreamFlux.bytes.length;
}
return inputStreamFlux;
} catch (IOException e) {
throw logger.logExceptionAsError(new IllegalStateException(e));
}
}

private static class InputStreamFlux {
private Flux<ByteBuffer> flux;
private byte[] bytes;
private long size;
}
// private InputStreamFlux fluxFromInputStream(InputStream inputStream) {
// try {
// InputStreamFlux inputStreamFlux = new InputStreamFlux();
// if (inputStream instanceof FileInputStream) {
// inputStreamFlux.size = ((FileInputStream) inputStream).getChannel().size();
// inputStreamFlux.flux = FluxUtil.toFluxByteBuffer(inputStream);
// } else if (inputStream instanceof ByteArrayInputStream) {
// inputStreamFlux.size = inputStream.available();
// inputStreamFlux.flux = FluxUtil.toFluxByteBuffer(inputStream);
// } else {
// ByteArrayOutputStream buffer = new ByteArrayOutputStream();
// int nRead;
// byte[] data = new byte[16384];
// while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
// buffer.write(data, 0, nRead);
// }
// inputStreamFlux.bytes = buffer.toByteArray();
// inputStreamFlux.size = inputStreamFlux.bytes.length;
// }
// return inputStreamFlux;
// } catch (IOException e) {
// throw logger.logExceptionAsError(new IllegalStateException(e));
// }
// }
//
// private static class InputStreamFlux {
// private Flux<ByteBuffer> flux;
// private byte[] bytes;
// private long size;
// }

private Mono<Void> withRetry(Mono<Void> observable) {
return observable
.retryWhen(
.retryWhen(Retry.withThrowable(
flux ->
flux
.zipWith(
Expand All @@ -348,7 +357,7 @@ private Mono<Void> withRetry(Mono<Void> observable) {
throw logger.logExceptionAsError(Exceptions.propagate(throwable));
}
})
.flatMap(i -> Mono.delay(Duration.ofSeconds(((long) i) * 10))));
.flatMap(i -> Mono.delay(Duration.ofSeconds(((long) i) * 10)))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ public void warDeploy(File warFile) {
}

@Override
public Mono<Void> warDeployAsync(InputStream warFile) {
return warDeployAsync(warFile, null);
public Mono<Void> warDeployAsync(InputStream warFile, long length) {
return warDeployAsync(warFile, length, null);
}

@Override
public void warDeploy(InputStream warFile) {
warDeployAsync(warFile).block();
public void warDeploy(InputStream warFile, long length) {
warDeployAsync(warFile, length).block();
}

@Override
Expand All @@ -194,13 +194,13 @@ public void warDeploy(File warFile, String appName) {
}

@Override
public void warDeploy(InputStream warFile, String appName) {
warDeployAsync(warFile, appName).block();
public void warDeploy(InputStream warFile, long length, String appName) {
warDeployAsync(warFile, length, appName).block();
}

@Override
public Mono<Void> warDeployAsync(InputStream warFile, String appName) {
return kuduClient.warDeployAsync(warFile, appName);
public Mono<Void> warDeployAsync(InputStream warFile, long length, String appName) {
return kuduClient.warDeployAsync(warFile, length, appName);
}

@Override
Expand All @@ -218,13 +218,15 @@ public void zipDeploy(File zipFile) {
}

@Override
public Mono<Void> zipDeployAsync(InputStream zipFile) {
return kuduClient.zipDeployAsync(zipFile).then(WebAppImpl.this.stopAsync()).then(WebAppImpl.this.startAsync());
public Mono<Void> zipDeployAsync(InputStream zipFile, long length) {
return kuduClient.zipDeployAsync(zipFile, length)
.then(WebAppImpl.this.stopAsync())
.then(WebAppImpl.this.startAsync());
}

@Override
public void zipDeploy(InputStream zipFile) {
zipDeployAsync(zipFile).block();
public void zipDeploy(InputStream zipFile, long length) {
zipDeployAsync(zipFile, length).block();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ public interface DeploymentSlot
* Deploys a WAR file onto the Azure specialized Tomcat on this web app.
*
* @param warFile the WAR file to upload
* @param length the length of the file
*/
void warDeploy(InputStream warFile);
void warDeploy(InputStream warFile, long length);

/**
* Deploys a WAR file onto the Azure specialized Tomcat on this web app.
*
* @param warFile the WAR file to upload
* @param length the length of the file
* @return a completable of the operation
*/
Mono<Void> warDeployAsync(InputStream warFile);
Mono<Void> warDeployAsync(InputStream warFile, long length);

/**
* Deploys a WAR file onto the Azure specialized Tomcat on this web app.
Expand All @@ -75,18 +77,20 @@ public interface DeploymentSlot
* Deploys a WAR file onto the Azure specialized Tomcat on this web app.
*
* @param warFile the WAR file to upload
* @param length the length of the file
* @param appName the name of the app, default to "ROOT" when not provided
*/
void warDeploy(InputStream warFile, String appName);
void warDeploy(InputStream warFile, long length, String appName);

/**
* Deploys a WAR file onto the Azure specialized Tomcat on this web app.
*
* @param warFile the WAR file to upload
* @param length the length of the file
* @param appName the name of the app, default to "ROOT" when not provided
* @return a completable of the operation
*/
Mono<Void> warDeployAsync(InputStream warFile, String appName);
Mono<Void> warDeployAsync(InputStream warFile, long length, String appName);

/**************************************************************
* Fluent interfaces to provision a deployment slot
Expand Down
Loading

0 comments on commit e4229fb

Please sign in to comment.