Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RequestContent #21320

Merged
merged 35 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
fa44787
Prototype for RequestContent
alzimmermsft May 11, 2021
fa4171d
Additional implementations of RequestContent
alzimmermsft May 12, 2021
0c75878
Add RequestOutbound implementations to HTTP client libraries
alzimmermsft May 12, 2021
ba65d71
Merge branch 'master' into AzCore_RequestContent
alzimmermsft May 14, 2021
929aa54
Add asFluxByteBuffer to RequestContent
alzimmermsft May 15, 2021
ef8ff2c
Merge branch 'master' into AzCore_RequestContent
alzimmermsft May 18, 2021
d76af17
Updates to RequestContent and move OkHttp to using RequestContent#wri…
alzimmermsft May 19, 2021
489a467
Merge branch 'master' into AzCore_RequestContent
alzimmermsft May 28, 2021
184bcb9
Fix Netty testing error
alzimmermsft May 28, 2021
7639d51
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 1, 2021
0283bc0
Updates to sending OkHttp requests
alzimmermsft Jun 1, 2021
8e881db
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 1, 2021
f51d356
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 2, 2021
67f6e82
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 3, 2021
1fcf77a
Removed writeTo, added BufferedByteBufferFlux
alzimmermsft Jun 3, 2021
91d3ba2
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 3, 2021
75a6059
Add RequestContent tests
alzimmermsft Jun 3, 2021
86cb04d
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 3, 2021
34f47ba
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 4, 2021
077d1ed
Add factory methods for InputStream RequestContent
alzimmermsft Jun 4, 2021
443643c
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 7, 2021
f99b4ff
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 8, 2021
9ecfb53
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 9, 2021
95b75ef
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 10, 2021
bf1898b
Make RequestContent an abstract class instead of interface
alzimmermsft Jun 10, 2021
871a299
Merge branch 'master' into AzCore_RequestContent
alzimmermsft Jun 16, 2021
80a3c8d
Cache the serialized object
alzimmermsft Jun 16, 2021
645c0fc
Fix linting issues
alzimmermsft Jun 16, 2021
7b70698
Merge branch 'main' into AzCore_RequestContent
alzimmermsft Jun 21, 2021
d2739fa
Merge branch 'main' into AzCore_RequestContent
alzimmermsft Jun 24, 2021
fcd70dc
Hide some APIs, added more tests
alzimmermsft Jun 24, 2021
49665ed
Merge branch 'main' into AzCore_RequestContent
alzimmermsft Jun 25, 2021
d7b4a97
Doc updates and PR feedback
alzimmermsft Jun 25, 2021
b23308d
Overloads to configure chunkSize
alzimmermsft Jun 25, 2021
d4bc387
Remove some overloads
alzimmermsft Jun 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ public class GoodLoggingCheck extends AbstractCheck {
private static final String CLIENT_LOGGER = "ClientLogger";
private static final String LOGGER = "logger";
private static final String STATIC_LOGGER_ERROR = "Use a static ClientLogger instance in a static method.";
private static final int[] REQUIRED_TOKENS = new int[]{
TokenTypes.IMPORT,
TokenTypes.INTERFACE_DEF,
TokenTypes.CLASS_DEF,
TokenTypes.LITERAL_NEW,
TokenTypes.VARIABLE_DEF,
TokenTypes.METHOD_CALL,
TokenTypes.METHOD_DEF
};

private static final String LOGGER_NAME_ERROR =
"ClientLogger instance naming: use ''%s'' instead of ''%s'' for consistency.";
Expand All @@ -42,7 +51,7 @@ public class GoodLoggingCheck extends AbstractCheck {
// Boolean indicator that indicates if the java class imports ClientLogger
private boolean hasClientLoggerImported;
// A LIFO queue stores the class names, pop top element if exist the class name AST node
private Queue<String> classNameDeque = Collections.asLifoQueue(new ArrayDeque<>());
private final Queue<String> classNameDeque = Collections.asLifoQueue(new ArrayDeque<>());
// Collection of Invalid logging packages
private static final Set<String> INVALID_LOGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
"org.slf4j", "org.apache.logging.log4j", "java.util.logging"
Expand All @@ -60,14 +69,7 @@ public int[] getAcceptableTokens() {

@Override
public int[] getRequiredTokens() {
return new int[] {
TokenTypes.IMPORT,
TokenTypes.CLASS_DEF,
TokenTypes.LITERAL_NEW,
TokenTypes.VARIABLE_DEF,
TokenTypes.METHOD_CALL,
TokenTypes.METHOD_DEF
};
return REQUIRED_TOKENS;
}

@Override
Expand Down Expand Up @@ -96,6 +98,7 @@ public void visitToken(DetailAST ast) {
});
break;
case TokenTypes.CLASS_DEF:
case TokenTypes.INTERFACE_DEF:
classNameDeque.offer(ast.findFirstToken(TokenTypes.IDENT).getText());
break;
case TokenTypes.LITERAL_NEW:
Expand Down Expand Up @@ -194,21 +197,20 @@ private void checkForInvalidStaticLoggerUsage(DetailAST methodDefToken) {
// if not a static method
if (!(TokenUtil.findFirstTokenByPredicate(methodDefToken,
node -> node.branchContains(TokenTypes.LITERAL_STATIC)).isPresent())) {

// error if static `LOGGER` present, LOGGER.*
if (methodDefToken.findFirstToken(TokenTypes.SLIST) != null) {
TokenUtil
.forEachChild(methodDefToken.findFirstToken(TokenTypes.SLIST), TokenTypes.EXPR, (exprToken) -> {
if (exprToken != null) {
DetailAST methodCallToken = exprToken.findFirstToken(TokenTypes.METHOD_CALL);
if (methodCallToken != null && methodCallToken.findFirstToken(TokenTypes.DOT) != null) {
if (methodCallToken.findFirstToken(TokenTypes.DOT)
.findFirstToken(TokenTypes.IDENT).getText().equals(LOGGER.toUpperCase())) {
log(methodDefToken, STATIC_LOGGER_ERROR);
}
TokenUtil.forEachChild(methodDefToken.findFirstToken(TokenTypes.SLIST), TokenTypes.EXPR, exprToken -> {
if (exprToken != null) {
DetailAST methodCallToken = exprToken.findFirstToken(TokenTypes.METHOD_CALL);
if (methodCallToken != null && methodCallToken.findFirstToken(TokenTypes.DOT) != null) {
if (methodCallToken.findFirstToken(TokenTypes.DOT)
.findFirstToken(TokenTypes.IDENT).getText().equals(LOGGER.toUpperCase())) {
log(methodDefToken, STATIC_LOGGER_ERROR);
}
}
});
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Package containing implementation details.
*/
package com.azure.core.http.netty.implementation;
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.Function;

/**
* HttpClient implementation for OkHttp.
Expand Down Expand Up @@ -64,9 +63,13 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r)
//
toOkHttpRequest(request).subscribe(okHttpRequest -> {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
sink.onCancel(call::cancel);
try {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
sink.onCancel(call::cancel);
} catch (Exception ex) {
sink.error(ex);
}
}, sink::error);
}));
}
Expand All @@ -78,29 +81,26 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
* @return the Mono emitting okhttp request
*/
private static Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
return Mono.just(new okhttp3.Request.Builder())
.map(rb -> {
rb.url(request.getUrl());
if (request.getHeaders() != null) {
for (HttpHeader hdr : request.getHeaders()) {
// OkHttp allows for headers with multiple values, but it treats them as separate headers,
// therefore, we must call rb.addHeader for each value, using the same key for all of them
hdr.getValuesList().forEach(value -> rb.addHeader(hdr.getName(), value));
}
}
return rb;
})
.flatMap((Function<Request.Builder, Mono<Request.Builder>>) rb -> {
if (request.getHttpMethod() == HttpMethod.GET) {
return Mono.just(rb.get());
} else if (request.getHttpMethod() == HttpMethod.HEAD) {
return Mono.just(rb.head());
} else {
return toOkHttpRequestBody(request.getBody(), request.getHeaders())
.map(requestBody -> rb.method(request.getHttpMethod().toString(), requestBody));
}
})
.map(Request.Builder::build);
Request.Builder requestBuilder = new Request.Builder()
.url(request.getUrl());

if (request.getHeaders() != null) {
for (HttpHeader hdr : request.getHeaders()) {
// OkHttp allows for headers with multiple values, but it treats them as separate headers,
// therefore, we must call rb.addHeader for each value, using the same key for all of them
hdr.getValuesList().forEach(value -> requestBuilder.addHeader(hdr.getName(), value));
}
}

if (request.getHttpMethod() == HttpMethod.GET) {
return Mono.just(requestBuilder.get().build());
} else if (request.getHttpMethod() == HttpMethod.HEAD) {
return Mono.just(requestBuilder.head().build());
}

return toOkHttpRequestBody(request.getBody(), request.getHeaders())
.map(okhttpRequestBody -> requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build());
}

/**
Expand All @@ -117,11 +117,9 @@ private static Mono<RequestBody> toOkHttpRequestBody(Flux<ByteBuffer> bbFlux, Ht

return bsMono.map(bs -> {
String contentType = headers.getValue("Content-Type");
if (contentType == null) {
return RequestBody.create(bs, null);
} else {
return RequestBody.create(bs, MediaType.parse(contentType));
}
MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType);

return RequestBody.create(bs, mediaType);
});
}

Expand All @@ -146,9 +144,7 @@ private static Mono<ByteString> toByteString(Flux<ByteBuffer> bbFlux) {
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
})
.map(b -> ByteString.of(b.readByteArray())),
okio.Buffer::clear)
}).map(b -> ByteString.of(b.readByteArray())), okio.Buffer::clear)
.switchIfEmpty(EMPTY_BYTE_STRING_MONO);
}

Expand All @@ -163,11 +159,13 @@ private static class OkHttpCallback implements okhttp3.Callback {
this.eagerlyReadResponse = eagerlyReadResponse;
}

@SuppressWarnings("NullableProblems")
@Override
public void onFailure(okhttp3.Call call, IOException e) {
sink.error(e);
}

@SuppressWarnings("NullableProblems")
@Override
public void onResponse(okhttp3.Call call, okhttp3.Response response) {
/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Package containing implementation details.
*/
package com.azure.core.http.okhttp.implementation;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -40,6 +39,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -66,6 +66,7 @@ public static void beforeClass() {
server.stubFor(post("/shortPost").willReturn(aResponse().withBody(SHORT_BODY)));
server.stubFor(get(RETURN_HEADERS_AS_IS_PATH).willReturn(aResponse()
.withTransformers(OkHttpAsyncHttpClientResponseTransformer.NAME)));

server.start();
}

Expand All @@ -87,25 +88,33 @@ public void testFlowableResponseLongBodyAsByteArrayAsync() {
}

@Test
@Disabled("This tests behaviour of reactor netty's ByteBufFlux, not applicable for OkHttp")
public void testMultipleSubscriptionsEmitsError() {
HttpResponse response = getResponse("/short");

// Subscription:1
response.getBodyAsByteArray().block();
StepVerifier.create(response.getBodyAsByteArray())
.assertNext(Assertions::assertNotNull)
.expectComplete()
.verify(Duration.ofSeconds(20));

// Subscription:2
// Getting the bytes of an OkHttp response closes the stream on first read.
// Subsequent reads will return an IllegalStateException due to the stream being closed.
StepVerifier.create(response.getBodyAsByteArray())
.expectNextCount(0) // TODO: Check with smaldini, what is the verifier operator equivalent to .awaitDone(20, TimeUnit.SECONDS)
.verifyError(IllegalStateException.class);
.expectNextCount(0)
.expectError(IllegalStateException.class)
.verify(Duration.ofSeconds(20));

}

@Test
public void testFlowableWhenServerReturnsBodyAndNoErrorsWhenHttp500Returned() {
HttpResponse response = getResponse("/error");
StepVerifier.create(response.getBodyAsString())
.expectNext("error") // TODO: .awaitDone(20, TimeUnit.SECONDS) [See previous todo]
.verifyComplete();
assertEquals(500, response.getStatusCode());
StepVerifier.create(response.getBodyAsString())
.expectNext("error")
.expectComplete()
.verify(Duration.ofSeconds(20));
}

@Test
Expand All @@ -128,7 +137,7 @@ public void testFlowableBackpressure() {

@Test
public void testRequestBodyIsErrorShouldPropagateToResponse() {
HttpClient client = HttpClient.createDefault();
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
.setHeader("Content-Length", "123")
.setBody(Flux.error(new RuntimeException("boo")));
Expand All @@ -140,7 +149,7 @@ public void testRequestBodyIsErrorShouldPropagateToResponse() {

@Test
public void testRequestBodyEndsInErrorShouldPropagateToResponse() {
HttpClient client = HttpClient.createDefault();
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
String contentChunk = "abcdefgh";
int repetitions = 1000;
HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
Expand All @@ -149,10 +158,14 @@ public void testRequestBodyEndsInErrorShouldPropagateToResponse() {
.repeat(repetitions)
.map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
.concatWith(Flux.error(new RuntimeException("boo"))));
StepVerifier.create(client.send(request))
// .awaitDone(10, TimeUnit.SECONDS)
.expectErrorMessage("boo")
.verify();

try {
StepVerifier.create(client.send(request))
.expectErrorMessage("boo")
.verify(Duration.ofSeconds(10));
} catch (Exception ex) {
assertEquals("boo", ex.getMessage());
}
}

@Test
Expand Down Expand Up @@ -200,42 +213,30 @@ public void testServerShutsDownSocketShouldPushErrorToContentFlowable() {
});
}

@Disabled("This flakey test fails often on MacOS. https://github.com/Azure/azure-sdk-for-java/issues/4357.")
@Test
public void testConcurrentRequests() throws NoSuchAlgorithmException {
int numRequests = 100; // 100 = 1GB of data read
HttpClient client = HttpClient.createDefault();
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
byte[] expectedDigest = digest(LONG_BODY);
long expectedByteCount = (long) numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length;

Mono<Long> numBytesMono = Flux.range(1, numRequests)
.parallel(10)
.runOn(Schedulers.boundedElastic())
.flatMap(n -> Mono.fromCallable(() -> getResponse(client, "/long")).flatMapMany(response -> {
MessageDigest md = md5Digest();
return response.getBody()
.doOnNext(md::update)
.map(bb -> new NumberedByteBuffer(n, bb))
// .doOnComplete(() -> System.out.println("completed " + n))
.doOnComplete(() -> Assertions.assertArrayEquals(expectedDigest,
md.digest(), "wrong digest!"));
.doOnNext(buffer -> md.update(buffer.duplicate()))
.doOnComplete(() -> assertArrayEquals(expectedDigest, md.digest(), "wrong digest!"));
}))
.sequential()
// enable the doOnNext call to see request numbers and thread names
// .doOnNext(g -> System.out.println(g.n + " " +
// Thread.currentThread().getName()))
.map(nbb -> (long) nbb.bb.limit())
.reduce(Long::sum)
.subscribeOn(Schedulers.boundedElastic());
.map(buffer -> (long) buffer.remaining())
.reduce(Long::sum);

StepVerifier.create(numBytesMono)
// .awaitDone(timeoutSeconds, TimeUnit.SECONDS)
.expectNext((long) (numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length))
.verifyComplete();
//
// long numBytes = numBytesMono.block();
// t = System.currentTimeMillis() - t;
// System.out.println("totalBytesRead=" + numBytes / 1024 / 1024 + "MB in " + t / 1000.0 + "s");
// assertEquals(numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length, numBytes);
.expectNext(expectedByteCount)
.expectComplete()
.verify(Duration.ofSeconds(60));
}

@Test
Expand Down Expand Up @@ -284,16 +285,6 @@ private static byte[] digest(String s) throws NoSuchAlgorithmException {
return md.digest();
}

private static final class NumberedByteBuffer {
final long n;
final ByteBuffer bb;

NumberedByteBuffer(long n, ByteBuffer bb) {
this.n = n;
this.bb = bb;
}
}

private static HttpResponse getResponse(String path) {
HttpClient client = new OkHttpAsyncHttpClientBuilder().build();
return getResponse(client, path);
Expand Down
1 change: 1 addition & 0 deletions sdk/core/azure-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@
--add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=com.fasterxml.jackson.databind
--add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation.serializer=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.models=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.util=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.util.jsonpatch=ALL-UNNAMED
Expand Down
Loading