From b1de2ce2f136c097594bff31dfedfa328be327bb Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Tue, 13 Mar 2018 16:46:02 -0700 Subject: [PATCH 1/4] core,netty: client sends rst stream when server half-closes --- .../grpc/internal/AbstractClientStream.java | 16 +++++++++++++--- .../java/io/grpc/internal/MessageFramer.java | 1 - .../grpc/netty/CancelClientStreamCommand.java | 8 +++++--- .../io/grpc/netty/NettyClientHandler.java | 5 ++++- .../java/io/grpc/netty/NettyClientStream.java | 8 ++++++++ .../io/grpc/netty/NettyClientStreamTest.java | 19 ++++++++++++++++++- .../io/grpc/okhttp/OkHttpClientStream.java | 2 +- 7 files changed, 49 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 91edfb631e3..d78402730e3 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -92,7 +92,6 @@ void writeFrame( private final Framer framer; private boolean useGet; private Metadata headers; - private boolean outboundClosed; /** * Whether cancel() has been called. This is not strictly necessary, but removes the delay between * cancel() being called and isReady() beginning to return false, since cancel is commonly @@ -175,8 +174,8 @@ public final void deliverFrame( @Override public final void halfClose() { - if (!outboundClosed) { - outboundClosed = true; + if (!transportState().isOutboundClosed()) { + transportState().setOutboundClosed(); endOfMessages(); } } @@ -209,6 +208,9 @@ protected abstract static class TransportState extends AbstractStream.TransportS private boolean deframerClosed = false; private Runnable deframerClosedTask; + /** Whether the client has half-closed the stream. */ + private volatile boolean outboundClosed; + /** * Whether the stream is closed from the transport's perspective. This can differ from {@link * #listenerClosed} because there may still be messages buffered to deliver to the application. @@ -253,6 +255,14 @@ protected final ClientStreamListener listener() { return listener; } + private final void setOutboundClosed() { + outboundClosed = true; + } + + protected final boolean isOutboundClosed() { + return outboundClosed; + } + /** * Called by transport implementations when they receive headers. * diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index 8c95ffbdfb5..118417df5e9 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -78,7 +78,6 @@ void deliverFrame( private final byte[] headerScratch = new byte[HEADER_LENGTH]; private final WritableBufferAllocator bufferAllocator; private final StatsTraceContext statsTraceCtx; - // transportTracer is nullable until it is integrated with client transports private boolean closed; // Tracing and stats-related states diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java index 9ea2725cd68..f13863e2579 100644 --- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java +++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java @@ -18,18 +18,19 @@ import com.google.common.base.Preconditions; import io.grpc.Status; +import javax.annotation.Nullable; /** * Command sent from a Netty client stream to the handler to cancel the stream. */ class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand { private final NettyClientStream.TransportState stream; - private final Status reason; + @Nullable private final Status reason; CancelClientStreamCommand(NettyClientStream.TransportState stream, Status reason) { this.stream = Preconditions.checkNotNull(stream, "stream"); - Preconditions.checkNotNull(reason, "reason"); - Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status"); + Preconditions.checkArgument( + reason == null || !reason.isOk(), "Should not cancel with OK status"); this.reason = reason; } @@ -37,6 +38,7 @@ NettyClientStream.TransportState stream() { return stream; } + @Nullable Status reason() { return reason; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 552d174bbc2..fd16746b438 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -524,7 +524,10 @@ public void operationComplete(ChannelFuture future) throws Exception { private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); - stream.transportReportStatus(cmd.reason(), true, new Metadata()); + Status reason = cmd.reason(); + if (reason != null) { + stream.transportReportStatus(reason, true, new Metadata()); + } encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0e3ecfc874f..747480d3240 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -280,6 +280,11 @@ public void runOnTransportThread(final Runnable r) { } } + @Override + public void deframerClosed(boolean hasPartialMessageIgnored) { + super.deframerClosed(hasPartialMessageIgnored); + } + @Override public void bytesRead(int processedBytes) { handler.returnProcessedBytes(http2Stream, processedBytes); @@ -293,6 +298,9 @@ public void deframeFailed(Throwable cause) { void transportHeadersReceived(Http2Headers headers, boolean endOfStream) { if (endOfStream) { + if (!isOutboundClosed()) { + handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true); + } transportTrailersReceived(Utils.convertTrailers(headers)); } else { transportHeadersReceived(Utils.convertHeaders(headers)); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java index fe67313231b..41cbad294e2 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java @@ -261,6 +261,23 @@ public void inboundTrailersClosesCall() throws Exception { stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true); } + @Test + public void inboundTrailersBeforeHalfCloseSendsRstStream() { + stream().transportState().setId(STREAM_ID); + stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false); + stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true); + verify(writeQueue).enqueue(isA(CancelClientStreamCommand.class), eq(true)); + } + + @Test + public void inboundTrailersAfterHalfCloseDoesNotSendRstStream() { + stream().transportState().setId(STREAM_ID); + stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false); + stream.halfClose(); + stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true); + verify(writeQueue, never()).enqueue(isA(CancelClientStreamCommand.class), eq(true)); + } + @Test public void inboundStatusShouldSetStatus() throws Exception { stream().transportState().setId(STREAM_ID); @@ -293,7 +310,7 @@ public void invalidInboundHeadersCancelStream() throws Exception { stream().transportState().transportDataReceived(Unpooled.buffer(1000).writeZero(1000), false); // Now verify that cancel is sent and an error is reported to the listener - verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true)); + verify(writeQueue).enqueue(isA(CancelClientStreamCommand.class), eq(true)); ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); verify(listener).closed(captor.capture(), same(PROCESSED), metadataCaptor.capture()); diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 4cb6153a9c4..7a5ee95dab7 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -320,7 +320,7 @@ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { @GuardedBy("lock") private void onEndOfStream() { - if (!framer().isClosed()) { + if (!isOutboundClosed()) { // If server's end-of-stream is received before client sends end-of-stream, we just send a // reset to server to fully close the server side stream. transport.finishStream(id(),null, PROCESSED, false, ErrorCode.CANCEL, null); From 6ec1ebd066a471d638adb2b1f0379ca96fed8b99 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 21 Mar 2018 17:38:09 -0700 Subject: [PATCH 2/4] address comments --- .../grpc/netty/CancelClientStreamCommand.java | 8 +++----- .../java/io/grpc/netty/NettyClientHandler.java | 5 +---- .../java/io/grpc/netty/NettyClientStream.java | 17 ++++++++++------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java index f13863e2579..9ea2725cd68 100644 --- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java +++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java @@ -18,19 +18,18 @@ import com.google.common.base.Preconditions; import io.grpc.Status; -import javax.annotation.Nullable; /** * Command sent from a Netty client stream to the handler to cancel the stream. */ class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand { private final NettyClientStream.TransportState stream; - @Nullable private final Status reason; + private final Status reason; CancelClientStreamCommand(NettyClientStream.TransportState stream, Status reason) { this.stream = Preconditions.checkNotNull(stream, "stream"); - Preconditions.checkArgument( - reason == null || !reason.isOk(), "Should not cancel with OK status"); + Preconditions.checkNotNull(reason, "reason"); + Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status"); this.reason = reason; } @@ -38,7 +37,6 @@ NettyClientStream.TransportState stream() { return stream; } - @Nullable Status reason() { return reason; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index fd16746b438..552d174bbc2 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -524,10 +524,7 @@ public void operationComplete(ChannelFuture future) throws Exception { private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); - Status reason = cmd.reason(); - if (reason != null) { - stream.transportReportStatus(reason, true, new Metadata()); - } + stream.transportReportStatus(cmd.reason(), true, new Metadata()); encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 747480d3240..022e21ceb3f 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -280,11 +280,6 @@ public void runOnTransportThread(final Runnable r) { } } - @Override - public void deframerClosed(boolean hasPartialMessageIgnored) { - super.deframerClosed(hasPartialMessageIgnored); - } - @Override public void bytesRead(int processedBytes) { handler.returnProcessedBytes(http2Stream, processedBytes); @@ -298,10 +293,18 @@ public void deframeFailed(Throwable cause) { void transportHeadersReceived(Http2Headers headers, boolean endOfStream) { if (endOfStream) { + transportTrailersReceived(Utils.convertTrailers(headers)); if (!isOutboundClosed()) { - handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true); + handler + .getWriteQueue() + .enqueue( + new CancelClientStreamCommand( + this, + Status.INTERNAL.withDescription( + "Cancelled stream after server closed. " + + "The server's status should be reported instead of this one")), + true); } - transportTrailersReceived(Utils.convertTrailers(headers)); } else { transportHeadersReceived(Utils.convertHeaders(headers)); } From 62cb7cf4a25430f5fb9f5243a44f2749706773f4 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 21 Mar 2018 17:39:42 -0700 Subject: [PATCH 3/4] revert comment cleanup (simplify backport) --- core/src/main/java/io/grpc/internal/MessageFramer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index 118417df5e9..8c95ffbdfb5 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -78,6 +78,7 @@ void deliverFrame( private final byte[] headerScratch = new byte[HEADER_LENGTH]; private final WritableBufferAllocator bufferAllocator; private final StatsTraceContext statsTraceCtx; + // transportTracer is nullable until it is integrated with client transports private boolean closed; // Tracing and stats-related states From 1e3e2692ec439b28a4312c3f723f54f86f831849 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Thu, 22 Mar 2018 10:52:03 -0700 Subject: [PATCH 4/4] set cancel command reason to null --- .../io/grpc/netty/CancelClientStreamCommand.java | 8 +++++--- .../main/java/io/grpc/netty/NettyClientHandler.java | 5 ++++- .../main/java/io/grpc/netty/NettyClientStream.java | 12 ++---------- .../java/io/grpc/netty/NettyClientStreamTest.java | 7 ++++++- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java index 9ea2725cd68..f13863e2579 100644 --- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java +++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java @@ -18,18 +18,19 @@ import com.google.common.base.Preconditions; import io.grpc.Status; +import javax.annotation.Nullable; /** * Command sent from a Netty client stream to the handler to cancel the stream. */ class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand { private final NettyClientStream.TransportState stream; - private final Status reason; + @Nullable private final Status reason; CancelClientStreamCommand(NettyClientStream.TransportState stream, Status reason) { this.stream = Preconditions.checkNotNull(stream, "stream"); - Preconditions.checkNotNull(reason, "reason"); - Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status"); + Preconditions.checkArgument( + reason == null || !reason.isOk(), "Should not cancel with OK status"); this.reason = reason; } @@ -37,6 +38,7 @@ NettyClientStream.TransportState stream() { return stream; } + @Nullable Status reason() { return reason; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 552d174bbc2..fd16746b438 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -524,7 +524,10 @@ public void operationComplete(ChannelFuture future) throws Exception { private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); - stream.transportReportStatus(cmd.reason(), true, new Metadata()); + Status reason = cmd.reason(); + if (reason != null) { + stream.transportReportStatus(reason, true, new Metadata()); + } encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 022e21ceb3f..e569ca9d8f9 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -293,18 +293,10 @@ public void deframeFailed(Throwable cause) { void transportHeadersReceived(Http2Headers headers, boolean endOfStream) { if (endOfStream) { - transportTrailersReceived(Utils.convertTrailers(headers)); if (!isOutboundClosed()) { - handler - .getWriteQueue() - .enqueue( - new CancelClientStreamCommand( - this, - Status.INTERNAL.withDescription( - "Cancelled stream after server closed. " - + "The server's status should be reported instead of this one")), - true); + handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true); } + transportTrailersReceived(Utils.convertTrailers(headers)); } else { transportHeadersReceived(Utils.convertHeaders(headers)); } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java index 41cbad294e2..4ff3b4f6476 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java @@ -266,7 +266,12 @@ public void inboundTrailersBeforeHalfCloseSendsRstStream() { stream().transportState().setId(STREAM_ID); stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false); stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true); - verify(writeQueue).enqueue(isA(CancelClientStreamCommand.class), eq(true)); + + // Verify a cancel stream with reason=null is sent to the handler. + ArgumentCaptor captor = ArgumentCaptor + .forClass(CancelClientStreamCommand.class); + verify(writeQueue).enqueue(captor.capture(), eq(true)); + assertNull(captor.getValue().reason()); } @Test