Skip to content

Commit

Permalink
Fix Void response body connection leak (Azure#384)
Browse files Browse the repository at this point in the history
  • Loading branch information
RikkiGibson authored Feb 23, 2018
1 parent 29b33eb commit 5871dbe
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ private Single<?> handleRestResponseReturnTypeAsync(HttpResponse response, Swagg

final TypeToken bodyTypeToken = TypeToken.of(bodyType);
if (bodyTypeToken.isSubtypeOf(Void.class)) {
asyncResult = Single.just(new RestResponse<>(responseStatusCode, deserializedHeaders, responseHeaders.toMap(), null));
asyncResult = response.streamBodyAsync().lastElement().ignoreElement()
.andThen(Single.just(new RestResponse<>(responseStatusCode, deserializedHeaders, responseHeaders.toMap(), null)));
} else {
final Map<String, String> rawHeaders = responseHeaders.toMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.google.common.io.BaseEncoding;
import com.microsoft.rest.v2.annotations.BodyParam;
import com.microsoft.rest.v2.annotations.DELETE;
import com.microsoft.rest.v2.annotations.ExpectedResponses;
import com.microsoft.rest.v2.annotations.GET;
import com.microsoft.rest.v2.annotations.HeaderParam;
Expand Down Expand Up @@ -154,6 +155,14 @@ interface IOService {

@GET("/javasdktest/upload/100m-{id}.dat?{sas}")
Single<RestResponse<Void, Flowable<ByteBuffer>>> download100M(@PathParam("id") String id, @PathParam(value = "sas", encoded = true) String sas);

@ExpectedResponses({ 201 })
@PUT("/testcontainer{id}?restype=container&{sas}")
Single<RestResponse<Void, Void>> createContainer(@PathParam("id") String id, @PathParam(value = "sas", encoded = true) String sas);

@ExpectedResponses({ 202 })
@DELETE("/testcontainer{id}?restype=container&{sas}")
Single<RestResponse<Void, Void>> deleteContainer(@PathParam("id") String id, @PathParam(value = "sas", encoded = true) String sas);
}

private static final Path TEMP_FOLDER_PATH = Paths.get("temp");
Expand Down Expand Up @@ -478,6 +487,38 @@ public CompletableSource call() throws Exception {
}
})).blockingAwait();

System.in.read();
Thread.sleep(10000);
}

@Test
public void testHighParallelism() throws Exception {
final String sas = System.getenv("JAVA_SDK_TEST_SAS");
HttpHeaders headers = new HttpHeaders()
.set("x-ms-version", "2017-04-17");

HttpPipeline pipeline = HttpPipeline.build(
new AddDatePolicyFactory(),
new AddHeadersPolicyFactory(headers),
new HttpLoggingPolicyFactory(HttpLogDetailLevel.BASIC));

final IOService service = RestProxy.create(IOService.class, pipeline);
Flowable.range(0, 10000)
.flatMapCompletable(new Function<Integer, CompletableSource>() {
@Override
public CompletableSource apply(Integer integer) throws Exception {
return service.createContainer(integer.toString(), sas)
.toCompletable()
.onErrorResumeNext(new Function<Throwable, CompletableSource>() {
@Override
public CompletableSource apply(Throwable throwable) throws Exception {
if (throwable instanceof RestException && ((RestException) throwable).response().statusCode() == 409) {
return Completable.complete();
} else {
return Completable.error(throwable);
}
}
}).andThen(service.deleteContainer(integer.toString(), sas).toCompletable());
}
}).blockingAwait();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public MockHttpResponse(int statusCode, String string) {
}

public MockHttpResponse(int statusCode, HttpHeaders headers) {
this(statusCode, headers, null);
this(statusCode, headers, new byte[0]);
}

public MockHttpResponse(int statusCode, HttpHeaders headers, Object serializable) {
Expand Down

0 comments on commit 5871dbe

Please sign in to comment.