Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core,netty: client sends rst stream when server half-closes #4222

Merged
merged 4 commits into from
Mar 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/main/java/io/grpc/internal/AbstractClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ void writeFrame(
private final Framer framer;
private boolean useGet;
private Metadata headers;
private boolean outboundClosed;
/**
* Whether cancel() has been called. This is not strictly necessary, but removes the delay between
* cancel() being called and isReady() beginning to return false, since cancel is commonly
Expand Down Expand Up @@ -175,8 +174,8 @@ public final void deliverFrame(

@Override
public final void halfClose() {
if (!outboundClosed) {
outboundClosed = true;
if (!transportState().isOutboundClosed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather this conditional remain like it was (non-volatile). As it is written here, it seems like some other code path could setOutboundClosed(). However, it actually appears to be an unnecessary condition (halfClose is only called once), so let's instead leave it as you have it and do a follow-up PR to remove the condition.

transportState().setOutboundClosed();
endOfMessages();
}
}
Expand Down Expand Up @@ -209,6 +208,9 @@ protected abstract static class TransportState extends AbstractStream.TransportS
private boolean deframerClosed = false;
private Runnable deframerClosedTask;

/** Whether the client has half-closed the stream. */
private volatile boolean outboundClosed;

/**
* Whether the stream is closed from the transport's perspective. This can differ from {@link
* #listenerClosed} because there may still be messages buffered to deliver to the application.
Expand Down Expand Up @@ -253,6 +255,14 @@ protected final ClientStreamListener listener() {
return listener;
}

private final void setOutboundClosed() {
outboundClosed = true;
}

protected final boolean isOutboundClosed() {
return outboundClosed;
}

/**
* Called by transport implementations when they receive headers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@

import com.google.common.base.Preconditions;
import io.grpc.Status;
import javax.annotation.Nullable;

/**
* Command sent from a Netty client stream to the handler to cancel the stream.
*/
class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand {
private final NettyClientStream.TransportState stream;
private final Status reason;
@Nullable private final Status reason;

CancelClientStreamCommand(NettyClientStream.TransportState stream, Status reason) {
this.stream = Preconditions.checkNotNull(stream, "stream");
Preconditions.checkNotNull(reason, "reason");
Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im a little uncomfortable with using null here. I think a sentinel value like Status.CANCELED.withDescription("early trailers") would be a safer approach. Anything else that accesses the reason may expect it to be non null. Also, most of gRPC makes the status non null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of null here is to be able to send RST_STREAM to the server without invoking transportReportStatus (again) in NettyClientHandler#cancelStream. This field is accessed in exactly one place (NettyClientHandler#cancelStream, which should be the only thing parsing these commands anyways?), and it checks for null to decide whether to report the status to the transport. I could accomplish the same by adding a RstStreamCommand (and had this in an earlier draft of this PR), but it would be functionally almost identical to CancelClientStreamCommand so seemed like unnecessary redundancy.

reason == null || !reason.isOk(), "Should not cancel with OK status");
this.reason = reason;
}

NettyClientStream.TransportState stream() {
return stream;
}

@Nullable
Status reason() {
return reason;
}
Expand Down
5 changes: 4 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
ChannelPromise promise) {
NettyClientStream.TransportState stream = cmd.stream();
stream.transportReportStatus(cmd.reason(), true, new Metadata());
Status reason = cmd.reason();
if (reason != null) {
stream.transportReportStatus(reason, true, new Metadata());
}
encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
}

Expand Down
3 changes: 3 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ public void deframeFailed(Throwable cause) {

void transportHeadersReceived(Http2Headers headers, boolean endOfStream) {
if (endOfStream) {
if (!isOutboundClosed()) {
handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do this after the transportTrailersReceived() call, then there should be no need to avoid the second call to transportReportStatus; we call transportReportStatus lots of times on failures and that's okay. It gets too complicated trying to avoid it.

The Status used here should probably make it obvious that it shouldn't be seen though. Maybe: Status.INTERNAL.withDescription("Cancelled stream after server closed. The server's status should be reported instead of this one")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoot. I am wrong. The second call to transportReportStatus() will have stopDelivery=true. The call from transportTrailersReceived() will have stopDelivery=false. Let's put it back to the way you had it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
transportTrailersReceived(Utils.convertTrailers(headers));
} else {
transportHeadersReceived(Utils.convertHeaders(headers));
Expand Down
24 changes: 23 additions & 1 deletion netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,28 @@ public void inboundTrailersClosesCall() throws Exception {
stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true);
}

@Test
public void inboundTrailersBeforeHalfCloseSendsRstStream() {
stream().transportState().setId(STREAM_ID);
stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false);
stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true);

// Verify a cancel stream with reason=null is sent to the handler.
ArgumentCaptor<CancelClientStreamCommand> captor = ArgumentCaptor
.forClass(CancelClientStreamCommand.class);
verify(writeQueue).enqueue(captor.capture(), eq(true));
assertNull(captor.getValue().reason());
}

@Test
public void inboundTrailersAfterHalfCloseDoesNotSendRstStream() {
stream().transportState().setId(STREAM_ID);
stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false);
stream.halfClose();
stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true);
verify(writeQueue, never()).enqueue(isA(CancelClientStreamCommand.class), eq(true));
}

@Test
public void inboundStatusShouldSetStatus() throws Exception {
stream().transportState().setId(STREAM_ID);
Expand Down Expand Up @@ -293,7 +315,7 @@ public void invalidInboundHeadersCancelStream() throws Exception {
stream().transportState().transportDataReceived(Unpooled.buffer(1000).writeZero(1000), false);

// Now verify that cancel is sent and an error is reported to the listener
verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true));
verify(writeQueue).enqueue(isA(CancelClientStreamCommand.class), eq(true));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(listener).closed(captor.capture(), same(PROCESSED), metadataCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {

@GuardedBy("lock")
private void onEndOfStream() {
if (!framer().isClosed()) {
if (!isOutboundClosed()) {
// If server's end-of-stream is received before client sends end-of-stream, we just send a
// reset to server to fully close the server side stream.
transport.finishStream(id(),null, PROCESSED, false, ErrorCode.CANCEL, null);
Expand Down