diff --git a/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/src/main/java/reactor/netty/http/client/HttpClientOperations.java index 3b977aa092..09efee5ade 100644 --- a/src/main/java/reactor/netty/http/client/HttpClientOperations.java +++ b/src/main/java/reactor/netty/http/client/HttpClientOperations.java @@ -775,21 +775,32 @@ public NettyOutbound sendUsing(Callable sourceInput, @Override public Mono then() { - ByteBufAllocator alloc = parent.channel().alloc(); + ByteBufAllocator alloc = parent.channel() + .alloc(); return Flux.from(source) - .collect(alloc::heapBuffer, ByteBuf::writeBytes) - .flatMap(agg -> { - if (!HttpUtil.isTransferEncodingChunked(request) && !HttpUtil.isContentLengthSet(request)) { - request.headers() - .setInt(HttpHeaderNames.CONTENT_LENGTH, agg.readableBytes()); - } - if (agg.readableBytes() > 0) { - return parent.then().thenEmpty(FutureMono.disposableWriteAndFlush(parent.channel(), Mono.just(agg))); - } - agg.release(); - return parent.then(); + .switchOnFirst((signal, flux) -> { + if (signal.hasValue()) { + ByteBuf buf = signal.get(); + if (buf != null && buf.readableBytes() > 0) { + return flux.collect(alloc::heapBuffer, ByteBuf::writeBytes) + .flatMap(agg -> { + if (!HttpUtil.isTransferEncodingChunked(request) && + !HttpUtil.isContentLengthSet(request)) { + request.headers() + .setInt(HttpHeaderNames.CONTENT_LENGTH, agg.readableBytes()); + } + return parent.then() + .thenEmpty(FutureMono.disposableWriteAndFlush(parent.channel(), Mono.just(agg))); + }) + .doOnDiscard(ByteBuf.class, ByteBuf::release); + } + } + else if (signal.hasError()) { + return flux.onErrorResume(Mono::error); + } + return parent.then(); }) - .doOnDiscard(ByteBuf.class, ByteBuf::release); + .then(); } @Override