Skip to content

Commit

Permalink
Do not create aggregated ByteBuf when publisher empty or first elemen…
Browse files Browse the repository at this point in the history
…t empty
  • Loading branch information
violetagg committed Feb 26, 2019
1 parent bafdb5b commit 8ac41d3
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions src/main/java/reactor/netty/http/client/HttpClientOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -775,21 +775,32 @@ public <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput,

@Override
public Mono<Void> then() {
ByteBufAllocator alloc = parent.channel().alloc();
ByteBufAllocator alloc = parent.channel()
.alloc();
return Flux.from(source)
.collect(alloc::heapBuffer, ByteBuf::writeBytes)
.flatMap(agg -> {
if (!HttpUtil.isTransferEncodingChunked(request) && !HttpUtil.isContentLengthSet(request)) {
request.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, agg.readableBytes());
}
if (agg.readableBytes() > 0) {
return parent.then().thenEmpty(FutureMono.disposableWriteAndFlush(parent.channel(), Mono.just(agg)));
}
agg.release();
return parent.then();
.switchOnFirst((signal, flux) -> {
if (signal.hasValue()) {
ByteBuf buf = signal.get();
if (buf != null && buf.readableBytes() > 0) {
return flux.collect(alloc::heapBuffer, ByteBuf::writeBytes)
.flatMap(agg -> {
if (!HttpUtil.isTransferEncodingChunked(request) &&
!HttpUtil.isContentLengthSet(request)) {
request.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, agg.readableBytes());
}
return parent.then()
.thenEmpty(FutureMono.disposableWriteAndFlush(parent.channel(), Mono.just(agg)));
})
.doOnDiscard(ByteBuf.class, ByteBuf::release);
}
}
else if (signal.hasError()) {
return flux.onErrorResume(Mono::error);
}
return parent.then();
})
.doOnDiscard(ByteBuf.class, ByteBuf::release);
.then();
}

@Override
Expand Down

0 comments on commit 8ac41d3

Please sign in to comment.