diff --git a/sdk/core/azure-core-http-netty/pom.xml b/sdk/core/azure-core-http-netty/pom.xml index d7b0225dfce88..f3393f91520e4 100644 --- a/sdk/core/azure-core-http-netty/pom.xml +++ b/sdk/core/azure-core-http-netty/pom.xml @@ -82,6 +82,14 @@ reactor-netty + + com.azure + azure-core + 1.0.0-preview.3 + test-jar + test + + io.projectreactor reactor-test diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithHttpProxyNettyTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithHttpProxyNettyTests.java similarity index 88% rename from sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithHttpProxyNettyTests.java rename to sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithHttpProxyNettyTests.java index 4cd0e9cb04a42..15bb5cb6fc1e7 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithHttpProxyNettyTests.java +++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithHttpProxyNettyTests.java @@ -1,11 +1,12 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.core.implementation; +package com.azure.core.http.netty; import com.azure.core.http.HttpClient; import com.azure.core.http.ProxyOptions; import com.azure.core.http.ProxyOptions.Type; +import com.azure.core.implementation.RestProxyTests; import org.junit.Ignore; import java.net.InetSocketAddress; diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithNettyTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithNettyTests.java similarity index 78% rename from sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithNettyTests.java rename to sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithNettyTests.java index d5fec7bdffbd0..ae8dad6de72d4 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithNettyTests.java +++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithNettyTests.java @@ -1,9 +1,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.core.implementation; +package com.azure.core.http.netty; import com.azure.core.http.HttpClient; +import com.azure.core.implementation.RestProxyTests; public class RestProxyWithNettyTests extends RestProxyTests { diff --git a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java index cada0ba5cd118..db3c251be2302 100644 --- a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java +++ b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java @@ -288,7 +288,7 @@ private static Map queryToMap(String url) { } private static String bodyToString(HttpRequest request) throws IOException { - Mono asyncString = FluxUtil.collectBytesInByteBufferStream(request.body(), false) + Mono asyncString = FluxUtil.collectBytesInByteBufferStream(request.body()) .map(bytes -> new String(bytes, StandardCharsets.UTF_8)); return asyncString.block(); } diff --git a/sdk/core/azure-core/pom.xml b/sdk/core/azure-core/pom.xml index 400e4255bbc99..a7f6746740d87 100644 --- a/sdk/core/azure-core/pom.xml +++ b/sdk/core/azure-core/pom.xml @@ -147,6 +147,20 @@ test + + + + maven-jar-plugin + + + test-jar + test-compile + + test-jar + + + + diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java index a4070bdec0bd0..e942dabd6f066 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java @@ -23,7 +23,14 @@ public class HttpPipelineTests { @Test public void constructorWithNoArguments() { - HttpPipeline pipeline = HttpPipeline.builder().build(); + HttpPipeline pipeline = HttpPipeline.builder() + .httpClient(new MockHttpClient() { + @Override + public Mono send(HttpRequest request) { + // do nothing + return null; + } + }).build(); assertEquals(0, pipeline.getPolicyCount()); assertNotNull(pipeline.httpClient()); } @@ -34,7 +41,13 @@ public void withRequestPolicy() { .policies(new PortPolicy(80, true), new ProtocolPolicy("ftp", true), new RetryPolicy()) - .build(); + .httpClient(new MockHttpClient() { + @Override + public Mono send(HttpRequest request) { + // do nothing + return null; + } + }).build(); assertEquals(3, pipeline.getPolicyCount()); assertEquals(PortPolicy.class, pipeline.getPolicy(0).getClass()); @@ -49,7 +62,13 @@ public void withRequestOptions() throws MalformedURLException { .policies(new PortPolicy(80, true), new ProtocolPolicy("ftp", true), new RetryPolicy()) - .build(); + .httpClient(new MockHttpClient() { + @Override + public Mono send(HttpRequest request) { + // do nothing + return null; + } + }).build(); HttpPipelineCallContext context = new HttpPipelineCallContext(new HttpRequest(HttpMethod.GET, new URL("http://foo.com"))); assertNotNull(context); diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java b/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java index ed5f7e7f0986d..8628798cd4ddf 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java @@ -12,9 +12,7 @@ import com.azure.core.implementation.util.FluxUtil; import reactor.core.publisher.Mono; -import java.io.ByteArrayOutputStream; import java.net.URL; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.OffsetDateTime; @@ -25,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; -import java.util.stream.Collectors; /** * This HttpClient attempts to mimic the behavior of http://httpbin.org without ever making a network call. @@ -143,7 +140,7 @@ public Mono send(HttpRequest request) { json.data(createHttpBinResponseDataForRequest(request)); response = new MockHttpResponse(request, 200, json); } else if (requestPathLower.equals("/post")) { - if ("x-www-form-urlencoded".equalsIgnoreCase(contentType)) { + if (contentType != null && contentType.contains("x-www-form-urlencoded")) { Map parsed = bodyToMap(request); final HttpBinFormDataJSON json = new HttpBinFormDataJSON(); Form form = new Form(); @@ -153,6 +150,7 @@ public Mono send(HttpRequest request) { form.pizzaSize(PizzaSize.valueOf(parsed.get("size"))); form.toppings(Arrays.asList(parsed.get("toppings").split(","))); json.form(form); + response = new MockHttpResponse(request, 200, RESPONSE_HEADERS, json); } else { final HttpBinJSON json = new HttpBinJSON(); json.url(request.url().toString()); diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxTestUtils.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxTestUtils.java deleted file mode 100644 index 68f2d0b13ea5f..0000000000000 --- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxTestUtils.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.azure.core.implementation; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; -import java.nio.channels.CompletionHandler; - -public class FluxTestUtils { - - /** - * Collects byte buffers emitted by a Flux into a ByteBuf. - * - * @param stream A stream which emits ByteBuf instances. - * @param autoReleaseEnabled if ByteBuf instances in stream gets automatically released as they consumed - * @return A Mono which emits the concatenation of all the byte buffers given by the source Flux. - */ - public static Mono collectByteBufStream(Flux stream, boolean autoReleaseEnabled) { -// if (autoReleaseEnabled) { -// Mono mergedCbb = Mono.using( -// // Resource supplier -// () -> { -// CompositeByteBuf initialCbb = Unpooled.compositeBuffer(); -// return initialCbb; -// }, -// // source Mono creator -// (CompositeByteBuf initialCbb) -> { -// Mono reducedCbb = stream.reduce(initialCbb, (CompositeByteBuf currentCbb, ByteBuf nextBb) -> { -// CompositeByteBuf updatedCbb = currentCbb.addComponent(nextBb.retain()); -// return updatedCbb; -// }); -// // -// return reducedCbb -// .doOnNext((CompositeByteBuf cbb) -> cbb.writerIndex(cbb.capacity())) -// .filter((CompositeByteBuf cbb) -> cbb.isReadable()); -// }, -// // Resource cleaner -// (CompositeByteBuf finalCbb) -> finalCbb.release()); -// return mergedCbb; -// } else { -// return stream.collect(Unpooled::compositeBuffer, -// (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer))) -// .filter((CompositeByteBuf cbb) -> cbb.isReadable()) -// .map(bb -> bb); -// } - - // TODO - throw new IllegalStateException("This method is not yet re-implemented"); - } - - private static final int DEFAULT_CHUNK_SIZE = 1024 * 64; - - /** - * Writes the bytes emitted by a Flux to an AsynchronousFileChannel. - * - * @param content the Flux content - * @param outFile the file channel - * @return a Completable which performs the write operation when subscribed - */ - public static Mono bytebufStreamToFile(Flux content, AsynchronousFileChannel outFile) { - return bytebufStreamToFile(content, outFile, 0); - } - - /** - * Writes the bytes emitted by a Flux to an AsynchronousFileChannel - * starting at the given position in the file. - * - * @param content the Flux content - * @param outFile the file channel - * @param position the position in the file to begin writing - * @return a Mono<Void> which performs the write operation when subscribed - */ - public static Mono bytebufStreamToFile(Flux content, AsynchronousFileChannel outFile, long position) { - return Mono.create(emitter -> content.subscribe(new ByteBufToFileSubscriber(outFile, position, emitter))); - } - - private static class ByteBufToFileSubscriber implements Subscriber { - private ByteBufToFileSubscriber(AsynchronousFileChannel outFile, long position, MonoSink emitter) { - this.outFile = outFile; - this.pos = position; - this.emitter = emitter; - } - - // volatile ensures that writes to these fields by one thread will be immediately visible to other threads. - // An I/O pool thread will write to isWriting and read isCompleted, - // while another thread may read isWriting and write to isCompleted. - volatile boolean isWriting = false; - volatile boolean isCompleted = false; - volatile Subscription subscription; - volatile long pos; - AsynchronousFileChannel outFile; - MonoSink emitter; - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - s.request(1); - } - - @Override - public void onNext(ByteBuffer bytes) { - isWriting = true; - outFile.write(bytes, pos, null, onWriteCompleted); - } - - CompletionHandler onWriteCompleted = new CompletionHandler() { - @Override - public void completed(Integer bytesWritten, Object attachment) { - isWriting = false; - if (isCompleted) { - emitter.success(); - } - //noinspection NonAtomicOperationOnVolatileField - pos += bytesWritten; - if (subscription != null) { - subscription.request(1); - } - } - - @Override - public void failed(Throwable exc, Object attachment) { - if (subscription != null) { - subscription.cancel(); - } - emitter.error(exc); - } - }; - - @Override - public void onError(Throwable throwable) { - if (subscription != null) { - subscription.cancel(); - } - emitter.error(throwable); - } - - @Override - public void onComplete() { - isCompleted = true; - if (!isWriting) { - emitter.success(); - } - } - } - - /** - * Creates a {@link Flux} from an {@link AsynchronousFileChannel} - * which reads the entire file. - * - * @param fileChannel The file channel. - * @return The AsyncInputStream. - */ - public static Flux byteBufStreamFromFile(AsynchronousFileChannel fileChannel) { -// try { -// long size = fileChannel.size(); -// return byteBufStreamFromFile(fileChannel, DEFAULT_CHUNK_SIZE, 0, size); -// } catch (IOException e) { -// return Flux.error(e); -// } - - // TODO - throw new IllegalStateException("This method is not yet re-implemented"); - } -} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java index 07d7b45e90999..d0d08272afcb8 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java @@ -1486,30 +1486,24 @@ interface DownloadService { @Test public void simpleDownloadTest() { -// try (StreamResponse response = createService(DownloadService.class).getBytes()) { -// int count = 0; -// for (ByteBuffer byteBuf : response.value().doOnNext(b -> b.retain()).toIterable()) { -// // assertEquals(1, byteBuf.refCnt()); -// count += byteBuf.readableBytes(); -// ReferenceCountUtil.refCnt(byteBuf); -// } -// assertEquals(30720, count); -// } - - Assert.fail("Need to implement this test again"); + try (StreamResponse response = createService(DownloadService.class).getBytes()) { + int count = 0; + for (ByteBuffer byteBuf : response.value().toIterable()) { + // assertEquals(1, byteBuf.refCnt()); + count += byteBuf.remaining(); + } + assertEquals(30720, count); + } } @Test public void rawFlowableDownloadTest() { -// Flux response = createService(DownloadService.class).getBytesFlowable(); -// int count = 0; -// for (ByteBuffer byteBuf : response.doOnNext(b -> b.retain()).toIterable()) { -// count += byteBuf.readableBytes(); -// ReferenceCountUtil.refCnt(byteBuf); -// } -// assertEquals(30720, count); - - Assert.fail("Need to implement this test again"); + Flux response = createService(DownloadService.class).getBytesFlowable(); + int count = 0; + for (ByteBuffer byteBuf : response.toIterable()) { + count += byteBuf.remaining(); + } + assertEquals(30720, count); } @Host("https://httpbin.org") @@ -1520,37 +1514,33 @@ interface FlowableUploadService { } @Test - public void flowableUploadTest() throws Exception { -// Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI()); -// Flux stream = FluxUtil.byteBufferStreamFromFile(AsynchronousFileChannel.open(filePath)); -// -// final HttpClient httpClient = createHttpClient(); -// // Scenario: Log the body so that body buffering/replay behavior is exercised. -// // -// // Order in which policies applied will be the order in which they added to builder -// // -// final HttpPipeline httpPipeline = HttpPipeline.builder() -// .httpClient(httpClient) -// .policies(new HttpLoggingPolicy(HttpLogDetailLevel.BODY_AND_HEADERS, true)) -// .build(); -// // -// Response response = RestProxy.create(FlowableUploadService.class, httpPipeline, SERIALIZER).put(stream, Files.size(filePath)); -// -// assertEquals("The quick brown fox jumps over the lazy dog", response.value().data()); + public void fluxUploadTest() throws Exception { + Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI()); + Flux stream = FluxUtil.readFile(AsynchronousFileChannel.open(filePath)); + + final HttpClient httpClient = createHttpClient(); + // Scenario: Log the body so that body buffering/replay behavior is exercised. + // + // Order in which policies applied will be the order in which they added to builder + // + final HttpPipeline httpPipeline = HttpPipeline.builder() + .httpClient(httpClient) + .policies(new HttpLoggingPolicy(HttpLogDetailLevel.BODY_AND_HEADERS, true)) + .build(); + // + Response response = RestProxy.create(FlowableUploadService.class, httpPipeline, SERIALIZER).put(stream, Files.size(filePath)); - Assert.fail("Need to implement this test again"); + assertEquals("The quick brown fox jumps over the lazy dog", response.value().data()); } @Test public void segmentUploadTest() throws Exception { -// Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI()); -// AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.READ); -// Response response = createService(FlowableUploadService.class) -// .put(FluxUtil.byteBufStreamFromFile(fileChannel, 4, 15), 15); -// -// assertEquals("quick brown fox", response.value().data()); + Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI()); + AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.READ); + Response response = createService(FlowableUploadService.class) + .put(FluxUtil.readFile(fileChannel, 4, 15), 15); - Assert.fail("Need to implement this test again"); + assertEquals("quick brown fox", response.value().data()); } @Host("{url}") @@ -1646,7 +1636,7 @@ interface Service26 { @Test public void postUrlFormEncoded() { - Service26 service = RestProxy.create(Service26.class, HttpPipeline.builder().build()); + Service26 service = createService(Service26.class); HttpBinFormDataJSON response = service.postForm("Foo", "123", "foo@bar.com", PizzaSize.LARGE, Arrays.asList("Bacon", "Onion")); assertNotNull(response); assertNotNull(response.form());