diff --git a/ChangeLog.txt b/ChangeLog.txt index 96abc6d10607..ef40f7e203ed 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,2 +1,8 @@ +2.0.0-beta2 (2018-04-23) +- Major refinements to HTTP content streaming, in large part thanks to contributions by [David Moten](https://github.com/davidmoten). +- Removed Joda Time in favor of Java 8 DateTime classes +- NettyClient.Factory now accepts a Netty Bootstrap object allowing for more user configuration of channel attributes, such as the receive buffer size and low/high write watermarks. Currently, specifying an EventLoopGroup or `Class` is not supported. +- Various other minor improvements + 2.0.0-beta1 (2018-03-08) - First beta featuring Netty and RxJava 2. diff --git a/README.md b/README.md index 7696bbf7e0c2..0f6494e2284d 100644 --- a/README.md +++ b/README.md @@ -33,12 +33,30 @@ The runtime libraries for [AutoRest](https://github.com/azure/autorest) generate 2.0.0-beta1 - + + + - com.microsoft.rest.v2 - client-runtime-native - 2.0.0-SNAPSHOT - pom + io.netty + netty-tcnative-boringssl-static + 2.0.8.Final + ${os.detected.classifier} + + + + + io.netty + netty-transport-native-epoll + 4.1.23.Final + ${os.detected.classifier} + + + + + io.netty + netty-transport-native-kqueue + 4.1.23.Final + ${os.detected.classifier} ``` diff --git a/azure-client-authentication/src/main/java/com/microsoft/azure/v2/credentials/ApplicationTokenCredentials.java b/azure-client-authentication/src/main/java/com/microsoft/azure/v2/credentials/ApplicationTokenCredentials.java index 2e0c2af1f5cd..e907e8e7a2fb 100644 --- a/azure-client-authentication/src/main/java/com/microsoft/azure/v2/credentials/ApplicationTokenCredentials.java +++ b/azure-client-authentication/src/main/java/com/microsoft/azure/v2/credentials/ApplicationTokenCredentials.java @@ -6,18 +6,19 @@ package com.microsoft.azure.v2.credentials; -import com.google.common.io.BaseEncoding; import com.microsoft.aad.adal4j.AsymmetricKeyCredential; import com.microsoft.aad.adal4j.AuthenticationContext; import com.microsoft.aad.adal4j.AuthenticationException; import com.microsoft.aad.adal4j.AuthenticationResult; import com.microsoft.aad.adal4j.ClientCredential; import com.microsoft.azure.v2.AzureEnvironment; +import com.microsoft.rest.v2.util.Base64Util; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.security.KeyFactory; import java.security.NoSuchAlgorithmException; import java.security.PrivateKey; @@ -171,7 +172,7 @@ private PrivateKey privateKeyFromPem(String pem) { .replace("-----END PRIVATE KEY-----", "") .replace("\n", "") .replace("\r", ""); - byte[] key = BaseEncoding.base64().decode(base64); + byte[] key = Base64Util.decode(base64.getBytes(StandardCharsets.UTF_8)); PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(key); try { KeyFactory kf = KeyFactory.getInstance("RSA"); diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/v2/AzureProxy.java b/azure-client-runtime/src/main/java/com/microsoft/azure/v2/AzureProxy.java index 23ebbf630998..a8de76559119 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/v2/AzureProxy.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/v2/AzureProxy.java @@ -26,8 +26,6 @@ import com.microsoft.rest.v2.policy.CookiePolicyFactory; import com.microsoft.rest.v2.policy.CredentialsPolicyFactory; import com.microsoft.rest.v2.policy.DecodingPolicyFactory; -import com.microsoft.rest.v2.policy.HttpLogDetailLevel; -import com.microsoft.rest.v2.policy.HttpLoggingPolicyFactory; import com.microsoft.rest.v2.policy.RequestPolicyFactory; import com.microsoft.rest.v2.policy.RetryPolicyFactory; import com.microsoft.rest.v2.protocol.SerializerAdapter; @@ -184,7 +182,6 @@ public static HttpPipeline createDefaultPipeline(Class swaggerInterface, Requ if (credentialsPolicy != null) { builder.withRequestPolicy(credentialsPolicy); } - builder.withRequestPolicy(new HttpLoggingPolicyFactory(HttpLogDetailLevel.HEADERS)); return builder.build(); } diff --git a/client-runtime-native/pom.xml b/client-runtime-native/pom.xml deleted file mode 100644 index 68b4efe11ce3..000000000000 --- a/client-runtime-native/pom.xml +++ /dev/null @@ -1,102 +0,0 @@ - - - com.microsoft.rest.v2 - client-runtime-native - - 4.0.0 - - autorest-clientruntime-for-java - com.microsoft.azure.v2 - 2.0.0-SNAPSHOT - - pom - - Native Modules for AutoRest Client Runtime for Java - - This package contains native dependencies for the AutoRest Client Runtime for Java which can improve performance. - Including this package in your dependencies will cause all native modules available on your OS to be used. - Currently, only x86_64 architectures are supported. - - https://github.com/Azure/autorest-clientruntime-for-java - - - 2.0.7.Final - - - - - kr.motd.maven - os-maven-plugin - 1.5.0.Final - - - - - - boringssl - - true - - - - - io.netty - netty-tcnative-boringssl-static - ${netty-boringssl.version} - ${os.detected.name}-${os.detected.arch} - - - - - - osx - - - os.detected.name - osx - - - - - - io.netty - netty-transport-native-kqueue - ${netty.version} - ${os.detected.name}-${os.detected.arch} - - - - io.netty - netty-tcnative-boringssl-static - ${netty-boringssl.version} - ${os.detected.name}-${os.detected.arch} - - - - - - linux - - - os.detected.name - linux - - - - - io.netty - netty-transport-native-epoll - ${netty.version} - ${os.detected.name}-${os.detected.arch} - - - - io.netty - netty-tcnative-boringssl-static - ${netty-boringssl.version} - ${os.detected.name}-${os.detected.arch} - - - - - \ No newline at end of file diff --git a/client-runtime/pom.xml b/client-runtime/pom.xml index bbf3cc569c4e..a4a8b1564207 100644 --- a/client-runtime/pom.xml +++ b/client-runtime/pom.xml @@ -71,10 +71,6 @@ com.fasterxml.jackson.dataformat jackson-dataformat-xml - - org.apache.commons - commons-lang3 - io.reactivex.rxjava2 rxjava @@ -87,6 +83,7 @@ com.microsoft.azure azure-annotations + junit junit @@ -97,13 +94,6 @@ slf4j-simple test - - com.microsoft.rest.v2 - client-runtime-native - 2.0.0-SNAPSHOT - pom - test - com.github.tomakehurst wiremock-standalone diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/Base64Url.java b/client-runtime/src/main/java/com/microsoft/rest/v2/Base64Url.java index e8a37d9d68f8..9e859c506f26 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/Base64Url.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/Base64Url.java @@ -6,7 +6,7 @@ package com.microsoft.rest.v2; -import com.google.common.io.BaseEncoding; +import com.microsoft.rest.v2.util.Base64Util; import java.util.Arrays; @@ -73,7 +73,7 @@ public static Base64Url encode(byte[] bytes) { if (bytes == null) { return new Base64Url((String) null); } else { - return new Base64Url(BaseEncoding.base64Url().omitPadding().encode(bytes)); + return new Base64Url(Base64Util.encodeURLWithoutPadding(bytes)); } } @@ -95,8 +95,8 @@ public byte[] decodedBytes() { if (this.bytes == null) { return null; } - final String bytesString = new String(bytes); - final byte[] decodedBytes = BaseEncoding.base64Url().decode(bytesString); + + final byte[] decodedBytes = Base64Util.decodeURL(bytes); return decodedBytes; } diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/RestProxy.java b/client-runtime/src/main/java/com/microsoft/rest/v2/RestProxy.java index 1c6a01d006fd..93c6bde901a9 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/RestProxy.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/RestProxy.java @@ -36,7 +36,6 @@ import io.reactivex.Single; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; -import org.reactivestreams.Publisher; import java.io.IOException; import java.lang.reflect.Constructor; @@ -48,7 +47,6 @@ import java.net.URL; import java.nio.ByteBuffer; import java.util.Map; -import java.util.NoSuchElementException; /** * This class can be used to create a proxy implementation for a provided Swagger generated @@ -409,16 +407,17 @@ private Single handleRestResponseReturnTypeAsync(HttpResponse response, Swagg final int responseStatusCode = response.statusCode(); try { - final Single asyncResult; + Single asyncResult; if (entityTypeToken.isSubtypeOf(RestResponse.class)) { - final Constructor> responseConstructor = getRestResponseConstructor(entityTypeToken); + Constructor> responseConstructor = getRestResponseConstructor(entityTypeToken); - final Type[] deserializedTypes = getTypeArguments(entityTypeToken.getSupertype(RestResponse.class).getType()); - final Type bodyType = deserializedTypes[1]; - final HttpHeaders responseHeaders = response.headers(); - final Object deserializedHeaders = response.deserializedHeaders(); + Type[] deserializedTypes = getTypeArguments(entityTypeToken.getSupertype(RestResponse.class).getType()); - final TypeToken bodyTypeToken = TypeToken.of(bodyType); + HttpHeaders responseHeaders = response.headers(); + Object deserializedHeaders = response.deserializedHeaders(); + + Type bodyType = deserializedTypes[1]; + TypeToken bodyTypeToken = TypeToken.of(bodyType); if (bodyTypeToken.isSubtypeOf(Void.class)) { asyncResult = response.body().lastElement().ignoreElement() .andThen(Single.just(responseConstructor.newInstance(responseStatusCode, deserializedHeaders, responseHeaders.toMap(), null))); @@ -426,17 +425,22 @@ private Single handleRestResponseReturnTypeAsync(HttpResponse response, Swagg final Map rawHeaders = responseHeaders.toMap(); asyncResult = handleBodyReturnTypeAsync(response, methodParser, bodyType) - .map(new Function>() { - @Override - public RestResponse apply(Object body) throws Exception { - return responseConstructor.newInstance(responseStatusCode, deserializedHeaders, rawHeaders, body); - } - }).toSingle(responseConstructor.newInstance(responseStatusCode, deserializedHeaders, rawHeaders, null)); + .map((Function>) body -> responseConstructor.newInstance(responseStatusCode, deserializedHeaders, rawHeaders, body)) + .toSingle(responseConstructor.newInstance(responseStatusCode, deserializedHeaders, rawHeaders, null)); + } + + Type headersType = deserializedTypes[0]; + if (!response.isDecoded() && !TypeToken.of(headersType).isSubtypeOf(Void.class)) { + asyncResult = asyncResult.toCompletable().andThen(Single.error(new RestException( + "No deserialized headers were found. Please add a DecodingPolicyFactory to the HttpPipeline.", + response, + (Object) null))); } } else { // For now we're just throwing if the Maybe didn't emit a value. asyncResult = handleBodyReturnTypeAsync(response, methodParser, entityType).toSingle(); } + return asyncResult; } catch (ReflectiveOperationException e) { throw new RuntimeException(e); @@ -457,16 +461,16 @@ protected final Maybe handleBodyReturnTypeAsync(final HttpResponse response, } else if (entityTypeToken.isSubtypeOf(byte[].class)) { Maybe responseBodyBytesAsync = response.bodyAsByteArray().toMaybe(); if (returnValueWireType == Base64Url.class) { - responseBodyBytesAsync = responseBodyBytesAsync.map(new Function() { - @Override - public byte[] apply(byte[] base64UrlBytes) { - return new Base64Url(base64UrlBytes).decodedBytes(); - } - }); + responseBodyBytesAsync = responseBodyBytesAsync.map(base64UrlBytes -> new Base64Url(base64UrlBytes).decodedBytes()); } asyncResult = responseBodyBytesAsync; } else if (FlowableUtil.isFlowableByteBuffer(entityTypeToken)) { asyncResult = Maybe.just(response.body()); + } else if (!response.isDecoded()) { + asyncResult = Maybe.error(new RestException( + "No deserialized response body was found. Please add a DecodingPolicyFactory to the HttpPipeline.", + response, + (Object) null)); } else { Object result = response.deserializedBody(); if (result == null) { @@ -488,17 +492,6 @@ protected Object handleResumeOperation(HttpRequest httpRequest, OperationDescrip throw new Exception("The resume operation is not avaiable in the base RestProxy class."); } - private static final Function> WARN_MISSING_DECODING = new Function>() { - @Override - public Single apply(Throwable throwable) throws Exception { - if (throwable instanceof NoSuchElementException) { - return Single.error(new IllegalStateException("No decoded response body was found. DecodingPolicyFactory may be missing from the pipeline.", throwable)); - } else { - return Single.error(throwable); - } - } - }; - /** * Handle the provided asynchronous HTTP response and return the deserialized value. * @param httpRequest The original HTTP request. @@ -519,23 +512,14 @@ public final Object handleRestReturnType(HttpRequest httpRequest, Single>() { - @Override - public Single apply(HttpResponse response) throws Exception { - return handleRestResponseReturnTypeAsync(response, methodParser, singleTypeParam); - } - }).onErrorResumeNext(WARN_MISSING_DECODING); + result = asyncExpectedResponse.flatMap(response -> + handleRestResponseReturnTypeAsync(response, methodParser, singleTypeParam)); } else if (returnTypeToken.isSubtypeOf(Observable.class)) { throw new InvalidReturnTypeException("RestProxy does not support swagger interface methods (such as " + methodParser.fullyQualifiedMethodName() + "()) with a return type of " + returnType.toString()); } else if (FlowableUtil.isFlowableByteBuffer(returnTypeToken)) { - result = asyncExpectedResponse.flatMapPublisher(new Function>() { - @Override - public Publisher apply(HttpResponse httpResponse) throws Exception { - return httpResponse.body(); - } - }); + result = asyncExpectedResponse.flatMapPublisher(HttpResponse::body); } else if (returnTypeToken.isSubtypeOf(void.class) || returnTypeToken.isSubtypeOf(Void.class)) { asyncExpectedResponse.blockingGet(); @@ -544,12 +528,9 @@ else if (returnTypeToken.isSubtypeOf(void.class) || returnTypeToken.isSubtypeOf( // The return value is not an asynchronous type (Completable, Single, or Observable), so // block the deserialization until a value is received. result = asyncExpectedResponse - .flatMap(new Function>() { - @Override - public Single apply(HttpResponse httpResponse) throws Exception { - return handleRestResponseReturnTypeAsync(httpResponse, methodParser, returnType); - } - }).onErrorResumeNext(WARN_MISSING_DECODING).blockingGet(); + .flatMap(httpResponse -> + handleRestResponseReturnTypeAsync(httpResponse, methodParser, returnType)) + .blockingGet(); } return result; diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/credentials/BasicAuthenticationCredentials.java b/client-runtime/src/main/java/com/microsoft/rest/v2/credentials/BasicAuthenticationCredentials.java index 1682f1cfbb90..adfcb25bf7f7 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/credentials/BasicAuthenticationCredentials.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/credentials/BasicAuthenticationCredentials.java @@ -6,7 +6,8 @@ package com.microsoft.rest.v2.credentials; -import com.google.common.io.BaseEncoding; +import com.microsoft.rest.v2.util.Base64Util; + import java.io.UnsupportedEncodingException; /** @@ -39,7 +40,7 @@ public String authorizationHeaderValue(String uri) { String credential = userName + ":" + password; String encodedCredential; try { - encodedCredential = BaseEncoding.base64().encode(credential.getBytes("UTF8")); + encodedCredential = Base64Util.encodeToString(credential.getBytes("UTF8")); } catch (UnsupportedEncodingException e) { // The encoding is hard-coded, so if it's unsupported, it needs to be fixed right here. throw new RuntimeException(e); diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/BufferedHttpResponse.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/BufferedHttpResponse.java index 4a2ecd08528b..fd1d6754431d 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/BufferedHttpResponse.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/BufferedHttpResponse.java @@ -8,8 +8,6 @@ import io.reactivex.Flowable; import io.reactivex.Single; -import io.reactivex.functions.Function; -import org.reactivestreams.Publisher; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -19,7 +17,7 @@ */ public final class BufferedHttpResponse extends HttpResponse { private final HttpResponse innerHttpResponse; - private Single body; + private final Single cachedBody; /** * Creates a buffered HTTP response. @@ -27,7 +25,7 @@ public final class BufferedHttpResponse extends HttpResponse { */ public BufferedHttpResponse(HttpResponse innerHttpResponse) { this.innerHttpResponse = innerHttpResponse; - this.body = null; + this.cachedBody = innerHttpResponse.bodyAsByteArray().cache(); } @Override @@ -47,38 +45,18 @@ public HttpHeaders headers() { @Override public Single bodyAsByteArray() { - if (body == null) { - body = innerHttpResponse.bodyAsByteArray() - .map(new Function() { - @Override - public byte[] apply(byte[] bytes) { - body = Single.just(bytes); - return bytes; - } - }); - } - return body; + return cachedBody; } @Override public Flowable body() { - return bodyAsByteArray().flatMapPublisher(new Function>() { - @Override - public Publisher apply(byte[] bytes) throws Exception { - return Flowable.just(ByteBuffer.wrap(bytes)); - } - }); + return bodyAsByteArray().flatMapPublisher(bytes -> Flowable.just(ByteBuffer.wrap(bytes))); } @Override public Single bodyAsString() { return bodyAsByteArray() - .map(new Function() { - @Override - public String apply(byte[] bytes) { - return bytes == null ? null : new String(bytes, StandardCharsets.UTF_8); - } - }); + .map(bytes -> bytes == null ? null : new String(bytes, StandardCharsets.UTF_8)); } @Override @@ -86,6 +64,16 @@ public BufferedHttpResponse buffer() { return this; } + @Override + public boolean isDecoded() { + return innerHttpResponse.isDecoded(); + } + + @Override + public boolean withIsDecoded(boolean isDecoded) { + return innerHttpResponse.withIsDecoded(isDecoded); + } + @Override public Object deserializedHeaders() { return innerHttpResponse.deserializedHeaders(); diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipeline.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipeline.java index 2e9933805a05..16d5a524669f 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipeline.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipeline.java @@ -12,6 +12,8 @@ import com.microsoft.rest.v2.policy.RequestPolicyOptions; import io.reactivex.Single; +import java.util.Arrays; + /** * A collection of RequestPolicies that will be applied to a HTTP request before it is sent and will * be applied to a HTTP response when it is received. @@ -65,6 +67,15 @@ public Single sendRequestAsync(HttpRequest httpRequest) { return requestPolicy.sendAsync(httpRequest); } + /** + * Build a new HttpPipeline that will use the provided RequestPolicy factories. + * @param requestPolicyFactories The RequestPolicy factories to use. + * @return The built HttpPipeline. + */ + public static HttpPipeline build(Iterable requestPolicyFactories) { + return build(null, requestPolicyFactories); + } + /** * Build a new HttpPipeline that will use the provided RequestPolicy factories. * @param requestPolicyFactories The RequestPolicy factories to use. @@ -91,6 +102,16 @@ public static HttpPipeline build(HttpClient httpClient, RequestPolicyFactory... * @return The built HttpPipeline. */ public static HttpPipeline build(HttpPipelineOptions pipelineOptions, RequestPolicyFactory... requestPolicyFactories) { + return build(pipelineOptions, Arrays.asList(requestPolicyFactories)); + } + + /** + * Build a new HttpPipeline that will use the provided HttpClient and RequestPolicy factories. + * @param pipelineOptions The optional properties that can be set on the created HttpPipeline. + * @param requestPolicyFactories The RequestPolicy factories to use. + * @return The built HttpPipeline. + */ + public static HttpPipeline build(HttpPipelineOptions pipelineOptions, Iterable requestPolicyFactories) { final HttpPipelineBuilder builder = new HttpPipelineBuilder(pipelineOptions); if (requestPolicyFactories != null) { for (final RequestPolicyFactory requestPolicyFactory : requestPolicyFactories) { diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipelineBuilder.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipelineBuilder.java index 9b223664972e..3b8bd8ad205a 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipelineBuilder.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpPipelineBuilder.java @@ -220,10 +220,12 @@ public HttpPipelineBuilder withRequestIdPolicy() { * Adds a RequestPolicy which retries a failed request up to the given * number of times. * @param maxRetries The maximum number of times to retry failed requests. + * @param delayTime the delay between retries + * @param timeUnit the time unit of the delay * @return This HttpPipeline builder. */ - public HttpPipelineBuilder withRetryPolicy(int maxRetries) { - return withRequestPolicy(new RetryPolicyFactory(maxRetries)); + public HttpPipelineBuilder withRetryPolicy(int maxRetries, long delayTime, TimeUnit timeUnit) { + return withRequestPolicy(new RetryPolicyFactory(maxRetries, delayTime, timeUnit)); } /** diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpResponse.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpResponse.java index 62b84253c60d..5c53a343a671 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpResponse.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/HttpResponse.java @@ -12,6 +12,8 @@ import java.io.Closeable; import java.nio.ByteBuffer; +import com.microsoft.rest.v2.policy.DecodingPolicyFactory; + /** * This class contains all of the details necessary for reacting to a HTTP response from a * HttpResponse. @@ -19,6 +21,7 @@ public abstract class HttpResponse implements Closeable { private Object deserializedHeaders; private Object deserializedBody; + private boolean isDecoded; /** * Get this response object's HTTP status code. @@ -100,6 +103,24 @@ public void close() { // no-op } + /** + * Returns a value indicating whether this HttpResponse has been decoded by a {@link DecodingPolicyFactory}. + * @return whether this HttpResponse has been decoded + */ + public boolean isDecoded() { + return isDecoded; + } + + /** + * Sets the flag indicating whether this HttpResponse has been decoded by a {@link DecodingPolicyFactory}. + * @param isDecoded whether this HttpResponse has been decoded + * @return this HTTP repsonse + */ + public boolean withIsDecoded(boolean isDecoded) { + this.isDecoded = isDecoded; + return isDecoded; + } + /** * @return the deserialized headers, if present. Otherwise, null. */ diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java index 0fc7934fcff3..c0587581e02f 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java @@ -153,9 +153,8 @@ private static TransportConfig loadTransport(int groupSize) { return result; } - private static SharedChannelPool createChannelPool(final NettyAdapter adapter, TransportConfig config, + private static SharedChannelPool createChannelPool(Bootstrap bootstrap, TransportConfig config, int poolSize) { - Bootstrap bootstrap = new Bootstrap(); bootstrap.group(config.eventLoopGroup); bootstrap.channel(config.channelClass); bootstrap.option(ChannelOption.AUTO_READ, false); @@ -163,7 +162,9 @@ private static SharedChannelPool createChannelPool(final NettyAdapter adapter, T bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) TimeUnit.MINUTES.toMillis(3L)); return new SharedChannelPool(bootstrap, new AbstractChannelPoolHandler() { @Override - public void channelCreated(Channel ch) throws Exception { + public synchronized void channelCreated(Channel ch) throws Exception { + // Why is it necessary to have "synchronized" to prevent NRE in pipeline().get(Class)? + // Is channelCreated not run on the eventLoop assigned to the channel? ch.pipeline().addLast("HttpResponseDecoder", new HttpResponseDecoder()); ch.pipeline().addLast("HttpRequestEncoder", new HttpRequestEncoder()); ch.pipeline().addLast("HttpClientInboundHandler", new HttpClientInboundHandler()); @@ -174,13 +175,13 @@ public void channelCreated(Channel ch) throws Exception { private NettyAdapter() { TransportConfig config = loadTransport(0); this.eventLoopGroup = config.eventLoopGroup; - this.channelPool = createChannelPool(this, config, eventLoopGroup.executorCount() * 16); + this.channelPool = createChannelPool(new Bootstrap(), config, eventLoopGroup.executorCount() * 16); } - private NettyAdapter(int eventLoopGroupSize, int channelPoolSize) { + private NettyAdapter(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize) { TransportConfig config = loadTransport(eventLoopGroupSize); this.eventLoopGroup = config.eventLoopGroup; - this.channelPool = createChannelPool(this, config, channelPoolSize); + this.channelPool = createChannelPool(baseBootstrap, config, channelPoolSize); } private Single sendRequestInternalAsync(final HttpRequest request, final Proxy proxy) { @@ -312,14 +313,14 @@ private final class RequestSubscriber implements FlowableSubscriber, private boolean done; private final HttpClientInboundHandler inboundHandler; - + /** - * Ensures that requests are only made of upstream once the last write has completed + * Ensures that requests are only made of upstream once the last write has completed * and the channel can be written to synchronously (when isWritable is false writes - * are buffered). + * are buffered). */ private final AtomicInteger writing = new AtomicInteger(); - + //states for `writing` private static final int WRITE_COMPLETED_WRITABLE = 0; private static final int WRITING_WRITABLE = 1; @@ -342,11 +343,26 @@ public void onNext(ByteBuffer buf) { if (done) { return; } + try { writing(); - channel - .writeAndFlush(Unpooled.wrappedBuffer(buf)) - .addListener(this); + // Always dispatching writes on the event loop prevents data corruption on macOS. + // Since channel.write always dispatches to the event loop itself if needed internally, + // it seems fine to do it here. + channel.eventLoop().execute(() -> { + try { + channel + .write(Unpooled.wrappedBuffer(buf)) + .addListener(this); + } catch (Exception e) { + subscription.cancel(); + onError(e); + } + writeComplete(); + if (writing.get() == WRITE_COMPLETED_NOT_WRITABLE) { + channel.flush(); + } + }); } catch (Exception e) { subscription.cancel(); onError(e); @@ -383,11 +399,9 @@ public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { subscription.cancel(); emitError(future.cause()); - } else { - writeComplete(); } } - + private void writing() { while (true) { int s = writing.get(); @@ -404,7 +418,7 @@ private void writing() { } } } - + private void writeComplete() { while (true) { int s = writing.get(); @@ -458,17 +472,19 @@ void channelWritable(boolean writable) { } private void writeRequest(final DefaultHttpRequest raw) { - channel // - .write(raw) // - .addListener((Future future) -> { - if (!future.isSuccess()) { - emitError(future.cause()); - } - }); + channel.eventLoop().execute(() -> + channel // + .write(raw) // + .addListener((Future future) -> { + if (!future.isSuccess()) { + emitError(future.cause()); + } + }) + ); } private void writeBodyEnd() { - channel // + channel.eventLoop().execute(() -> channel // .writeAndFlush(DefaultLastHttpContent.EMPTY_LAST_CONTENT) // .addListener((Future future) -> { if (future.isSuccess()) { @@ -479,7 +495,7 @@ private void writeBodyEnd() { } else { emitError(future.cause()); } - }); + })); } private boolean transition(int from, int to) { @@ -487,8 +503,6 @@ private boolean transition(int from, int to) { } void emitError(Throwable throwable) { - // TODO remove printStackTrace - throwable.printStackTrace(); while (true) { int s = state.get(); if (s == ACQUIRING_NOT_DISPOSED) { @@ -517,7 +531,7 @@ void emitError(Throwable throwable) { * Returns false if and only if content subscription should be immediately * cancelled. * - * @param content + * @param content the content that was subscribed to * @return false if and only if content subscription should be immediately * cancelled */ @@ -601,7 +615,9 @@ public void dispose() { } private void closeAndReleaseChannel() { - channelPool.closeAndRelease(channel); + if (channel != null) { + channelPool.closeAndRelease(channel); + } } @Override @@ -992,13 +1008,15 @@ public Factory() { * Create a Netty client factory, specifying the event loop group size and the * channel pool size. * + * @param baseBootstrap + * a channel Bootstrap to use as a basis for channel creation * @param eventLoopGroupSize * the number of event loop executors * @param channelPoolSize * the number of pooled channels (connections) */ - public Factory(int eventLoopGroupSize, int channelPoolSize) { - this.adapter = new NettyAdapter(eventLoopGroupSize, channelPoolSize); + public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize) { + this.adapter = new NettyAdapter(baseBootstrap.clone(), eventLoopGroupSize, channelPoolSize); } @Override diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java index d5fbaeeb425c..eb8e3290f2fa 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java @@ -16,7 +16,6 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import io.reactivex.exceptions.Exceptions; @@ -138,16 +137,14 @@ protected void initChannel(Channel ch) throws Exception { } leased.put(request.uri, channelFuture.channel()); - channelFuture.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if (channelFuture.isSuccess()) { - handler.channelAcquired(channelFuture.channel()); - request.promise.setSuccess(channelFuture.channel()); - } else { - leased.remove(request.uri, channelFuture.channel()); - request.promise.setFailure(channelFuture.cause()); - } + channelFuture.addListener((ChannelFuture future) -> { + if (future.isSuccess()) { + handler.channelAcquired(future.channel()); + request.promise.setSuccess(future.channel()); + } else { + leased.remove(request.uri, future.channel()); + + request.promise.setFailure(future.cause()); } }); } diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/policy/AddDatePolicyFactory.java b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/AddDatePolicyFactory.java new file mode 100644 index 000000000000..f998f2d2bf8b --- /dev/null +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/AddDatePolicyFactory.java @@ -0,0 +1,47 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ + +package com.microsoft.rest.v2.policy; + +import com.microsoft.rest.v2.http.HttpRequest; +import com.microsoft.rest.v2.http.HttpResponse; +import io.reactivex.Single; + +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Locale; + +/** + * Creates a RequestPolicy which adds a Date header in RFC 1123 format when sending an HTTP request. + */ +public final class AddDatePolicyFactory implements RequestPolicyFactory { + @Override + public RequestPolicy create(RequestPolicy next, RequestPolicyOptions options) { + return new AddDatePolicy(next); + } + + private static final class AddDatePolicy implements RequestPolicy { + private final DateTimeFormatter format = DateTimeFormatter + .ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'") + .withZone(ZoneId.of("UTC")) + .withLocale(Locale.US); + + private final RequestPolicy next; + + AddDatePolicy(RequestPolicy next) { + this.next = next; + } + + @Override + public Single sendAsync(HttpRequest request) { + return Single.defer(() -> { + request.headers().set("Date", format.format(OffsetDateTime.now())); + return next.sendAsync(request); + }); + } + } +} diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/policy/DecodingPolicyFactory.java b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/DecodingPolicyFactory.java index 3ab22422377a..e5abd0486a1b 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/policy/DecodingPolicyFactory.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/DecodingPolicyFactory.java @@ -10,8 +10,6 @@ import com.microsoft.rest.v2.http.HttpResponse; import com.microsoft.rest.v2.protocol.HttpResponseDecoder; import io.reactivex.Single; -import io.reactivex.SingleSource; -import io.reactivex.functions.Function; /** * Creates a RequestPolicy which decodes the response body and headers. @@ -30,15 +28,12 @@ private DecodingPolicy(RequestPolicy next) { @Override public Single sendAsync(final HttpRequest request) { - return next.sendAsync(request).flatMap(new Function>() { - @Override - public SingleSource apply(final HttpResponse response) throws Exception { - HttpResponseDecoder decoder = request.responseDecoder(); - if (decoder != null) { - return request.responseDecoder().decode(response); - } else { - return Single.error(new NullPointerException("HttpRequest.responseDecoder() was null when decoding.")); - } + return next.sendAsync(request).flatMap(response -> { + HttpResponseDecoder decoder = request.responseDecoder(); + if (decoder != null) { + return request.responseDecoder().decode(response); + } else { + return Single.error(new NullPointerException("HttpRequest.responseDecoder() was null when decoding.")); } }); } diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/policy/ProxyAuthenticationPolicyFactory.java b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/ProxyAuthenticationPolicyFactory.java index 3b5aad585365..4da9598decd8 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/policy/ProxyAuthenticationPolicyFactory.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/ProxyAuthenticationPolicyFactory.java @@ -6,9 +6,9 @@ package com.microsoft.rest.v2.policy; -import com.google.common.io.BaseEncoding; import com.microsoft.rest.v2.http.HttpRequest; import com.microsoft.rest.v2.http.HttpResponse; +import com.microsoft.rest.v2.util.Base64Util; import io.reactivex.Single; import java.nio.charset.StandardCharsets; @@ -45,7 +45,7 @@ private ProxyAuthenticationPolicy(RequestPolicy next) { @Override public Single sendAsync(HttpRequest request) { String auth = username + ":" + password; - String encodedAuth = BaseEncoding.base64().encode(auth.getBytes(StandardCharsets.UTF_8)); + String encodedAuth = Base64Util.encodeToString(auth.getBytes(StandardCharsets.UTF_8)); request.withHeader("Proxy-Authentication", "Basic " + encodedAuth); return next.sendAsync(request); } diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/policy/RetryPolicyFactory.java b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/RetryPolicyFactory.java index 950ea0f5ec51..eb84a648d101 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/policy/RetryPolicyFactory.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/policy/RetryPolicyFactory.java @@ -12,27 +12,38 @@ import io.reactivex.functions.Function; import java.net.HttpURLConnection; +import java.util.concurrent.TimeUnit; /** * Creates a RequestPolicy which retries when a recoverable HTTP error occurs. */ public class RetryPolicyFactory implements RequestPolicyFactory { private static final int DEFAULT_MAX_RETRIES = 3; + private static final int DEFAULT_DELAY = 0; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; private final int maxRetries; + private final long delayTime; + private final TimeUnit timeUnit; /** - * Creates a Factory with the default number of retry attempts. + * Creates a RetryPolicyFactory with the default number of retry attempts and delay between retries. */ public RetryPolicyFactory() { maxRetries = DEFAULT_MAX_RETRIES; + delayTime = DEFAULT_DELAY; + timeUnit = DEFAULT_TIME_UNIT; } /** - * Creates a Factory. + * Creates a RetryPolicyFactory. * @param maxRetries The maximum number of retries to attempt. + * @param delayTime the delay between retries + * @param timeUnit the time unit of the delay */ - public RetryPolicyFactory(int maxRetries) { + public RetryPolicyFactory(int maxRetries, long delayTime, TimeUnit timeUnit) { this.maxRetries = maxRetries; + this.delayTime = delayTime; + this.timeUnit = timeUnit; } @Override @@ -52,20 +63,17 @@ public Single sendAsync(final HttpRequest request) { } private Single attemptAsync(final HttpRequest request, final int tryCount) { - Single result = next.sendAsync(request.buffer()) - .flatMap(new Function>() { - @Override - public Single apply(HttpResponse httpResponse) throws Exception { - Single result; - if (shouldRetry(httpResponse, tryCount)) { - result = attemptAsync(request, tryCount + 1); - } else { - result = Single.just(httpResponse); - } - return result; + return next.sendAsync(request.buffer()) + .flatMap((Function>) httpResponse -> { + if (shouldRetry(httpResponse, tryCount)) { + return attemptAsync(request, tryCount + 1).delaySubscription(delayTime, timeUnit); + } else { + return Single.just(httpResponse); } - }); - return result; + }).onErrorResumeNext(err -> + tryCount < maxRetries + ? attemptAsync(request, tryCount + 1).delaySubscription(delayTime, timeUnit) + : Single.error(err)); } private boolean shouldRetry(HttpResponse response, int tryCount) { @@ -77,4 +85,4 @@ private boolean shouldRetry(HttpResponse response, int tryCount) { && code != HttpURLConnection.HTTP_VERSION)); } } -} \ No newline at end of file +} diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/protocol/HttpResponseDecoder.java b/client-runtime/src/main/java/com/microsoft/rest/v2/protocol/HttpResponseDecoder.java index ff42f3c02296..5ee14c0409a7 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/protocol/HttpResponseDecoder.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/protocol/HttpResponseDecoder.java @@ -59,6 +59,8 @@ public HttpResponseDecoder(SwaggerMethodParser methodParser, SerializerAdapter decode(final HttpResponse response) { + response.withIsDecoded(true); + final Object deserializedHeaders; try { deserializedHeaders = deserializeHeaders(response.headers()); diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/util/Base64Util.java b/client-runtime/src/main/java/com/microsoft/rest/v2/util/Base64Util.java new file mode 100644 index 000000000000..4c629bbd5935 --- /dev/null +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/util/Base64Util.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ + +package com.microsoft.rest.v2.util; + +import java.util.Base64; + +/** + * Provides Base64 encoding/decoding methods. + */ +public final class Base64Util { + /** + * Encodes a byte array to base64. + * @param src the byte array to encode + * @return the base64 encoded bytes + */ + public static byte[] encode(byte[] src) { + return src == null ? null : Base64.getEncoder().encode(src); + } + + /** + * Encodes a byte array to base64 URL format. + * @param src the byte array to encode + * @return the base64 URL encoded bytes + */ + public static byte[] encodeURLWithoutPadding(byte[] src) { + return src == null ? null : Base64.getUrlEncoder().withoutPadding().encode(src); + } + + /** + * Encodes a byte array to a base 64 string. + * @param src the byte array to encode + * @return the base64 encoded string + */ + public static String encodeToString(byte[] src) { + return src == null ? null : Base64.getEncoder().encodeToString(src); + } + + /** + * Decodes a base64 encoded byte array. + * @param encoded the byte array to decode + * @return the decoded byte array + */ + public static byte[] decode(byte[] encoded) { + return encoded == null ? null : Base64.getDecoder().decode(encoded); + } + + /** + * Decodes a byte array in base64 URL format. + * @param src the byte array to decode + * @return the decoded byte array + */ + public static byte[] decodeURL(byte[] src) { + return src == null ? null : Base64.getUrlDecoder().decode(src); + } + + /** + * Decodes a base64 encoded string. + * @param encoded the string to decode + * @return the decoded byte array + */ + public static byte[] decodeString(String encoded) { + return encoded == null ? null : Base64.getDecoder().decode(encoded); + } + + private Base64Util() { + } +} diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/util/FlowableUtil.java b/client-runtime/src/main/java/com/microsoft/rest/v2/util/FlowableUtil.java index c03cc217a927..f2f1abc992cd 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/util/FlowableUtil.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/util/FlowableUtil.java @@ -6,20 +6,14 @@ package com.microsoft.rest.v2.util; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; import com.google.common.reflect.TypeToken; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.reactivex.Completable; -import io.reactivex.CompletableEmitter; -import io.reactivex.CompletableOnSubscribe; -import io.reactivex.Emitter; import io.reactivex.Flowable; import io.reactivex.FlowableSubscriber; import io.reactivex.Single; -import io.reactivex.functions.BiConsumer; -import io.reactivex.functions.BiFunction; -import io.reactivex.functions.Function; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; import org.reactivestreams.Subscriber; @@ -31,7 +25,6 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -62,19 +55,19 @@ public static boolean isFlowableByteBuffer(TypeToken entityTypeToken) { * @return A Single which emits the concatenation of all the byte buffers given by the source Flowable. */ public static Single collectBytesInArray(Flowable content) { - return content.collectInto(ByteStreams.newDataOutput(), new BiConsumer() { - @Override - public void accept(ByteArrayDataOutput out, ByteBuffer chunk) throws Exception { - // TODO: Would be nice to reduce copying here - byte[] arrayChunk = new byte[chunk.remaining()]; - chunk.get(arrayChunk); - out.write(arrayChunk); - } - }).map(new Function() { - @Override - public byte[] apply(ByteArrayDataOutput out) throws Exception { - return out.toByteArray(); + return content.collectInto(Unpooled.buffer(), ByteBuf::writeBytes).map(out -> { + try { + if (out.array().length == out.readableBytes()) { + return out.array(); + } else { + byte[] arr = new byte[out.readableBytes()]; + out.readBytes(arr); + return arr; + } + } finally { + out.release(); } + }); } @@ -84,116 +77,141 @@ public byte[] apply(ByteArrayDataOutput out) throws Exception { * @return A Single which emits the concatenation of all the byte buffers given by the source Flowable. */ public static Single collectBytesInBuffer(Flowable content) { - return collectBytesInArray(content) - .map(new Function() { - @Override - public ByteBuffer apply(byte[] bytes) throws Exception { - return ByteBuffer.wrap(bytes); - } - }); + return collectBytesInArray(content).map(ByteBuffer::wrap); } /** * Writes the bytes emitted by a Flowable to an AsynchronousFileChannel. + * * @param content the Flowable content - * @param fileChannel the file channel + * @param outFile the file channel * @return a Completable which performs the write operation when subscribed */ - public static Completable writeFile(final Flowable content, final AsynchronousFileChannel fileChannel) { - return Completable.create(new CompletableOnSubscribe() { - @Override - public void subscribe(final CompletableEmitter emitter) throws Exception { - content.subscribe(new FlowableSubscriber() { - // volatile ensures that writes to these fields by one thread will be immediately visible to other threads. - // An I/O pool thread will write to isWriting and read isCompleted, - // while another thread may read isWriting and write to isCompleted. - volatile boolean isWriting = false; - volatile boolean isCompleted = false; - volatile Subscription subscription; - volatile long position = 0; - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - s.request(1); - } + public static Completable writeFile(Flowable content, AsynchronousFileChannel outFile) { + return writeFile(content, outFile, 0); + } - @Override - public void onNext(ByteBuffer bytes) { - isWriting = true; - fileChannel.write(bytes, position, null, onWriteCompleted); - } + /** + * Writes the bytes emitted by a Flowable to an AsynchronousFileChannel + * starting at the given position in the file. + * + * @param content the Flowable content + * @param outFile the file channel + * @param position the position in the file to begin writing + * @return a Completable which performs the write operation when subscribed + */ + public static Completable writeFile(Flowable content, AsynchronousFileChannel outFile, long position) { + return Completable.create(emitter -> content.subscribe(new FlowableSubscriber() { + // volatile ensures that writes to these fields by one thread will be immediately visible to other threads. + // An I/O pool thread will write to isWriting and read isCompleted, + // while another thread may read isWriting and write to isCompleted. + volatile boolean isWriting = false; + volatile boolean isCompleted = false; + volatile Subscription subscription; + volatile long pos = position; + @Override + public void onSubscribe(Subscription s) { + subscription = s; + s.request(1); + } - CompletionHandler onWriteCompleted = new CompletionHandler() { - @Override - public void completed(Integer bytesWritten, Object attachment) { - isWriting = false; - if (isCompleted) { - emitter.onComplete(); - } - //noinspection NonAtomicOperationOnVolatileField - position += bytesWritten; - subscription.request(1); - } + @Override + public void onNext(ByteBuffer bytes) { + isWriting = true; + outFile.write(bytes, pos, null, onWriteCompleted); + } - @Override - public void failed(Throwable exc, Object attachment) { - subscription.cancel(); - emitter.onError(exc); - } - }; - @Override - public void onError(Throwable throwable) { - subscription.cancel(); - emitter.onError(throwable); + CompletionHandler onWriteCompleted = new CompletionHandler() { + @Override + public void completed(Integer bytesWritten, Object attachment) { + isWriting = false; + if (isCompleted) { + emitter.onComplete(); } + //noinspection NonAtomicOperationOnVolatileField + pos += bytesWritten; + subscription.request(1); + } - @Override - public void onComplete() { - isCompleted = true; - if (!isWriting) { - emitter.onComplete(); - } - } - }); + @Override + public void failed(Throwable exc, Object attachment) { + subscription.cancel(); + emitter.onError(exc); + } + }; + + @Override + public void onError(Throwable throwable) { + subscription.cancel(); + emitter.onError(throwable); } - }); + + @Override + public void onComplete() { + isCompleted = true; + if (!isWriting) { + emitter.onComplete(); + } + } + })); + } + + /** + * Creates a {@link Flowable} from an {@link AsynchronousFileChannel} + * which reads part of a file into chunks of the given size. + * + * @param fileChannel The file channel. + * @param chunkSize the size of file chunks to read. + * @param offset The offset in the file to begin reading. + * @param length The number of bytes to read from the file. + * @return the Flowable. + */ + public static Flowable readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) { + return new FileReadFlowable(fileChannel, chunkSize, offset, length); } /** - * Creates a {@link Flowable} from an {@link AsynchronousFileChannel} which reads part of a file. + * Creates a {@link Flowable} from an {@link AsynchronousFileChannel} + * which reads part of a file. * * @param fileChannel The file channel. * @param offset The offset in the file to begin reading. * @param length The number of bytes to read from the file. * @return the Flowable. */ - public static Flowable readFile(final AsynchronousFileChannel fileChannel, final long offset, final long length) { - return new FileReadFlowable(fileChannel, offset, length); + public static Flowable readFile(AsynchronousFileChannel fileChannel, long offset, long length) { + return readFile(fileChannel, DEFAULT_CHUNK_SIZE, offset, length); } /** - * Creates a {@link Flowable} from an {@link AsynchronousFileChannel} which reads the entire file. + * Creates a {@link Flowable} from an {@link AsynchronousFileChannel} + * which reads the entire file. + * * @param fileChannel The file channel. - * @throws IOException if an error occurs when determining file size * @return The AsyncInputStream. */ - public static Flowable readFile(AsynchronousFileChannel fileChannel) throws IOException { - long size = fileChannel.size(); - return readFile(fileChannel, 0, size); + public static Flowable readFile(AsynchronousFileChannel fileChannel) { + try { + long size = fileChannel.size(); + return readFile(fileChannel, DEFAULT_CHUNK_SIZE, 0, size); + } catch (IOException e) { + return Flowable.error(e); + } } - private static final int CHUNK_SIZE = 8192; + private static final int DEFAULT_CHUNK_SIZE = 1024 * 64; private static final class FileReadFlowable extends Flowable { private final AsynchronousFileChannel fileChannel; + private final int chunkSize; private final long offset; private final long length; - FileReadFlowable(AsynchronousFileChannel fileChannel, long offset, long length) { + FileReadFlowable(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) { this.fileChannel = fileChannel; + this.chunkSize = chunkSize; this.offset = offset; this.length = length; } @@ -291,7 +309,7 @@ private void drain() { private void doRead() { // use local variable to limit volatile reads long pos = position; - ByteBuffer innerBuf = ByteBuffer.allocate(Math.min(CHUNK_SIZE, maxRequired(pos))); + ByteBuffer innerBuf = ByteBuffer.allocate(Math.min(chunkSize, maxRequired(pos))); fileChannel.read(innerBuf, pos, innerBuf, this); } @@ -359,24 +377,16 @@ public void cancel() { * @return A stream that emits chunks of the original whole ByteBuffer */ public static Flowable split(final ByteBuffer whole, final int chunkSize) { - return Flowable.generate(new Callable() { - @Override - public Integer call() throws Exception { - return whole.position(); - } - }, new BiFunction, Integer>() { - @Override - public Integer apply(Integer position, Emitter emitter) throws Exception { - int newLimit = Math.min(whole.limit(), position + chunkSize); - if (position >= whole.limit()) { - emitter.onComplete(); - } else { - ByteBuffer chunk = whole.duplicate(); - chunk.position(position).limit(newLimit); - emitter.onNext(chunk); - } - return newLimit; + return Flowable.generate(whole::position, (position, emitter) -> { + int newLimit = Math.min(whole.limit(), position + chunkSize); + if (position >= whole.limit()) { + emitter.onComplete(); + } else { + ByteBuffer chunk = whole.duplicate(); + chunk.position(position).limit(newLimit); + emitter.onNext(chunk); } + return newLimit; }); } diff --git a/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyStressTests.java b/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyStressTests.java index 96a4798df23b..a81184e1ef4e 100644 --- a/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyStressTests.java +++ b/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyStressTests.java @@ -19,6 +19,7 @@ import com.microsoft.rest.v2.http.HttpPipelineBuilder; import com.microsoft.rest.v2.http.HttpRequest; import com.microsoft.rest.v2.http.HttpResponse; +import com.microsoft.rest.v2.policy.AddDatePolicyFactory; import com.microsoft.rest.v2.policy.AddHeadersPolicyFactory; import com.microsoft.rest.v2.policy.HostPolicyFactory; import com.microsoft.rest.v2.policy.HttpLogDetailLevel; @@ -36,26 +37,16 @@ import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; -import org.hamcrest.CoreMatchers; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Assume; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.Result; -import org.junit.runner.notification.RunListener; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; import java.lang.ProcessBuilder.Redirect; -import java.lang.Thread.UncaughtExceptionHandler; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.FileChannel; @@ -70,18 +61,12 @@ import java.security.MessageDigest; import java.time.Duration; import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.Base64; import java.util.List; -import java.util.Locale; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertTrue; @@ -156,32 +141,6 @@ public static void afterClass() throws Exception { } } - private static final class AddDatePolicyFactory implements RequestPolicyFactory { - @Override - public RequestPolicy create(RequestPolicy next, RequestPolicyOptions options) { - return new AddDatePolicy(next); - } - - private static final class AddDatePolicy implements RequestPolicy { - private final DateTimeFormatter format = DateTimeFormatter - .ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'") - .withZone(ZoneId.of("UTC")) - .withLocale(Locale.US); - - private final RequestPolicy next; - - AddDatePolicy(RequestPolicy next) { - this.next = next; - } - - @Override - public Single sendAsync(HttpRequest request) { - request.headers().set("Date", format.format(OffsetDateTime.now())); - return next.sendAsync(request); - } - } - } - private static final class ThrottlingRetryPolicyFactory implements RequestPolicyFactory { @Override public RequestPolicy create(RequestPolicy next, RequestPolicyOptions options) { diff --git a/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyTests.java b/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyTests.java index ae6e34f7a65e..fe3b02544669 100644 --- a/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyTests.java +++ b/client-runtime/src/test/java/com/microsoft/rest/v2/RestProxyTests.java @@ -1400,6 +1400,37 @@ public void service24Put() { assertEquals("45", resultHeaders.value("ABC123")); } + @Host("http://httpbin.org") + interface Service25 { + @GET("anything") + HttpBinJSON get(); + + @GET("anything") + Single getAsync(); + + @GET("anything") + Single> getBodyResponseAsync(); + } + + @Test(expected = RestException.class) + public void testMissingDecodingPolicyCausesException() { + Service25 service = RestProxy.create(Service25.class, HttpPipeline.build()); + service.get(); + } + + @Test(expected = RestException.class) + public void testSingleMissingDecodingPolicyCausesException() { + Service25 service = RestProxy.create(Service25.class, HttpPipeline.build()); + service.getAsync().blockingGet(); + service.getBodyResponseAsync().blockingGet(); + } + + @Test(expected = RestException.class) + public void testSingleBodyResponseMissingDecodingPolicyCausesException() { + Service25 service = RestProxy.create(Service25.class, HttpPipeline.build()); + service.getBodyResponseAsync().blockingGet(); + } + // Helpers protected T createService(Class serviceClass) { final HttpClient httpClient = createHttpClient(); diff --git a/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RequestIdPolicyTests.java b/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RequestIdPolicyTests.java index b95bffdb5308..15887020b26d 100644 --- a/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RequestIdPolicyTests.java +++ b/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RequestIdPolicyTests.java @@ -20,6 +20,7 @@ import java.net.URL; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; public class RequestIdPolicyTests { private final HttpResponse mockResponse = new HttpResponse() { @@ -105,7 +106,7 @@ public Single sendRequestAsync(HttpRequest request) { } }, new RequestIdPolicyFactory(), - new RetryPolicyFactory(1)); + new RetryPolicyFactory(1, 0, TimeUnit.SECONDS)); pipeline.sendRequestAsync(new HttpRequest("sameRequestIdForRetry", HttpMethod.GET, new URL("http://localhost/"), null)).blockingGet(); } diff --git a/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RetryPolicyTests.java b/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RetryPolicyTests.java index 69a65c239a1d..7c7dbc221a0f 100644 --- a/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RetryPolicyTests.java +++ b/client-runtime/src/test/java/com/microsoft/rest/v2/policy/RetryPolicyTests.java @@ -14,6 +14,7 @@ import io.reactivex.Single; import java.net.URL; +import java.util.concurrent.TimeUnit; public class RetryPolicyTests { @Test @@ -28,7 +29,7 @@ public Single sendRequestAsync(HttpRequest request) { return Single.just(new MockHttpResponse(codes[count++])); } }, - new RetryPolicyFactory(3)); + new RetryPolicyFactory(3, 0, TimeUnit.MILLISECONDS)); HttpResponse response = pipeline.sendRequestAsync( new HttpRequest( @@ -52,7 +53,7 @@ public Single sendRequestAsync(HttpRequest request) { return Single.just(new MockHttpResponse(500)); } }, - new RetryPolicyFactory(maxRetries)); + new RetryPolicyFactory(maxRetries, 0, TimeUnit.MILLISECONDS)); HttpResponse response = pipeline.sendRequestAsync( new HttpRequest( diff --git a/client-runtime/src/test/java/com/microsoft/rest/v2/util/FlowableUtilTests.java b/client-runtime/src/test/java/com/microsoft/rest/v2/util/FlowableUtilTests.java index 072ddf00df58..a7bbcb47ce4b 100644 --- a/client-runtime/src/test/java/com/microsoft/rest/v2/util/FlowableUtilTests.java +++ b/client-runtime/src/test/java/com/microsoft/rest/v2/util/FlowableUtilTests.java @@ -2,12 +2,14 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.charset.StandardCharsets; @@ -23,49 +25,51 @@ import io.reactivex.schedulers.Schedulers; public class FlowableUtilTests { - @Test public void testCanReadSlice() throws IOException { File file = new File("target/test1"); Files.write("hello there".getBytes(StandardCharsets.UTF_8), file); - AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - byte[] bytes = FlowableUtil.readFile(channel, 1, 3) // - .map(bb -> toBytes(bb)) // - .collectInto(new ByteArrayOutputStream(), (bos, b) -> bos.write(b)) // - .blockingGet().toByteArray(); - assertEquals("ell", new String(bytes, StandardCharsets.UTF_8)); + try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) { + byte[] bytes = FlowableUtil.readFile(channel, 1, 3) // + .map(bb -> toBytes(bb)) // + .collectInto(new ByteArrayOutputStream(), (bos, b) -> bos.write(b)) // + .blockingGet().toByteArray(); + assertEquals("ell", new String(bytes, StandardCharsets.UTF_8)); + } } @Test public void testCanReadEmptyFile() throws IOException { File file = new File("target/test2"); - file.delete(); file.createNewFile(); - AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - byte[] bytes = FlowableUtil.readFile(channel, 1, 3) // - .map(bb -> toBytes(bb)) // - .collectInto(new ByteArrayOutputStream(), (bos, b) -> bos.write(b)) // - .blockingGet().toByteArray(); - assertEquals(0, bytes.length); + try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) { + byte[] bytes = FlowableUtil.readFile(channel, 1, 3) // + .map(bb -> toBytes(bb)) // + .collectInto(new ByteArrayOutputStream(), OutputStream::write) // + .blockingGet().toByteArray(); + assertEquals(0, bytes.length); + } + assertTrue(file.delete()); } @Test public void testAsynchronyShortInput() throws IOException { File file = new File("target/test3"); Files.write("hello there".getBytes(StandardCharsets.UTF_8), file); - AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - byte[] bytes = FlowableUtil.readFile(channel) // - .map(bb -> toBytes(bb)) // - .rebatchRequests(1) // - .subscribeOn(Schedulers.io()) // - .observeOn(Schedulers.io()) // - .collectInto(new ByteArrayOutputStream(), (bos, b) -> bos.write(b)) // - .blockingGet() // - .toByteArray(); - assertEquals("hello there", new String(bytes, StandardCharsets.UTF_8)); - file.delete(); + try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) { + byte[] bytes = FlowableUtil.readFile(channel) // + .map(bb -> toBytes(bb)) // + .rebatchRequests(1) // + .subscribeOn(Schedulers.io()) // + .observeOn(Schedulers.io()) // + .collectInto(new ByteArrayOutputStream(), (bos, b) -> bos.write(b)) // + .blockingGet() // + .toByteArray(); + assertEquals("hello there", new String(bytes, StandardCharsets.UTF_8)); + } + assertTrue(file.delete()); } - + private static final int NUM_CHUNKS_IN_LONG_INPUT = 10_000_000; @Test @@ -82,17 +86,18 @@ public void testAsynchronyLongInput() throws IOException, NoSuchAlgorithmExcepti System.out.println("long input file size="+ file.length()/(1024*1024) + "MB"); byte[] expected = digest.digest(); digest.reset(); - AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - FlowableUtil.readFile(channel) // - .rebatchRequests(1) // - .subscribeOn(Schedulers.io()) // - .observeOn(Schedulers.io()) // - .blockingForEach(bb -> digest.update(bb)); - - assertArrayEquals(expected, digest.digest()); - file.delete(); + try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) { + FlowableUtil.readFile(channel) // + .rebatchRequests(1) // + .subscribeOn(Schedulers.io()) // + .observeOn(Schedulers.io()) // + .blockingForEach(bb -> digest.update(bb)); + + assertArrayEquals(expected, digest.digest()); + } + assertTrue(file.delete()); } - + @Test public void testBackpressureLongInput() throws IOException, NoSuchAlgorithmException { File file = new File("target/test4"); @@ -106,26 +111,28 @@ public void testBackpressureLongInput() throws IOException, NoSuchAlgorithmExcep } byte[] expected = digest.digest(); digest.reset(); - AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - FlowableUtil.readFile(channel) // - .rebatchRequests(1) // - .subscribeOn(Schedulers.io()) // - .observeOn(Schedulers.io()) // - .doOnNext(bb -> digest.update(bb)) // - .test(0) // - .assertNoValues() // - .requestMore(1) // - .awaitCount(1) // - .assertValueCount(1) - .requestMore(1) // - .awaitCount(2) // - .assertValueCount(2) // - .requestMore(Long.MAX_VALUE) // - .awaitDone(20, TimeUnit.SECONDS) // - .assertComplete(); - + + try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) { + FlowableUtil.readFile(channel) // + .rebatchRequests(1) // + .subscribeOn(Schedulers.io()) // + .observeOn(Schedulers.io()) // + .doOnNext(bb -> digest.update(bb)) // + .test(0) // + .assertNoValues() // + .requestMore(1) // + .awaitCount(1) // + .assertValueCount(1) + .requestMore(1) // + .awaitCount(2) // + .assertValueCount(2) // + .requestMore(Long.MAX_VALUE) // + .awaitDone(20, TimeUnit.SECONDS) // + .assertComplete(); + } + assertArrayEquals(expected, digest.digest()); - file.delete(); + assertTrue(file.delete()); } @Test @@ -152,7 +159,7 @@ public void testSplitForMultipleSplitSizesFromOneTo16() throws NoSuchAlgorithmEx } @Test - public void testSplitOnEmptyContent() throws NoSuchAlgorithmException { + public void testSplitOnEmptyContent() { ByteBuffer bb = ByteBuffer.allocateDirect(16); bb.flip(); FlowableUtil // diff --git a/pom.xml b/pom.xml index e3e1c66ba99c..1c8c483d094c 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ 4.1.23.Final + 2.8.11 UTF-8 @@ -62,17 +63,12 @@ com.fasterxml.jackson.datatype jackson-datatype-jsr310 - 2.8.11 + ${jackson.version} com.fasterxml.jackson.dataformat jackson-dataformat-xml - 2.8.11 - - - org.apache.commons - commons-lang3 - 3.4 + ${jackson.version} com.microsoft.azure @@ -89,12 +85,6 @@ rxjava 2.1.12 - - org.slf4j - slf4j-simple - 1.7.22 - test - org.slf4j slf4j-api @@ -105,12 +95,6 @@ azure-annotations 1.2.0 - - junit - junit - 4.12 - test - io.netty netty-buffer @@ -126,11 +110,23 @@ netty-codec-http ${netty.version} + + + org.slf4j + slf4j-simple + 1.7.22 + test + + + junit + junit + 4.12 + test + - org.apache.maven.plugins @@ -216,6 +212,5 @@ client-runtime azure-client-runtime azure-client-authentication - client-runtime-native