From 904ed6f59181ffb7437c59ed36737c407a8ca270 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 16 Jan 2024 13:44:41 -0800 Subject: [PATCH] h2: disable auto read for stream channels (#2799) Motivation: HTTP/2 stream/child channel use cases don't disable autoRead explicitly. This means we may read and queue more data than desirable. Modifications: - Disable auto read for h2 stream/child channels. --- .../grpc/netty/ErrorHandlingTest.java | 14 ++++++++++++- .../H2PriorKnowledgeFeatureParityTest.java | 20 +++++++++++++++++++ .../internal/DefaultNettyConnection.java | 6 ++++++ .../netty/internal/NettyChannelPublisher.java | 4 ++++ 4 files changed, 43 insertions(+), 1 deletion(-) diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java index ef48150a52..ce6c1967e8 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ErrorHandlingTest.java @@ -79,6 +79,7 @@ import static io.servicetalk.utils.internal.ThrowableUtils.throwException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -108,6 +109,8 @@ class ErrorHandlingTest { private BlockingTesterClient blockingClient; @Nullable private Publisher requestPublisher; + @Nullable + private GrpcExecutionStrategy clientExecutionStrategy; private enum TestMode { HttpClientFilterThrows, @@ -158,6 +161,7 @@ boolean isSafeNoOffload() { private void setUp(TestMode testMode, GrpcExecutionStrategy serverStrategy, GrpcExecutionStrategy clientStrategy) throws Exception { this.testMode = testMode; + this.clientExecutionStrategy = clientStrategy; cannedResponse = TestResponse.newBuilder().setMessage("foo").build(); ServiceFactory serviceFactory; StreamingHttpServiceFilterFactory serviceFilterFactory = IDENTITY_FILTER; @@ -643,7 +647,15 @@ private void verifyException(final Throwable cause) { assertNotNull(cause); assertThat(assertThrows(GrpcStatusException.class, () -> { throw cause; - }).status().code(), equalTo(expectedStatus())); + }).status().code(), either(equalTo(expectedStatus())).or(equalTo(expectedStatusSecondary()))); + } + + private GrpcStatusCode expectedStatusSecondary() { + // The server writes trailers with expected status then a RST stream frame. The client may not be done with its + // write operation and the RST may be result in CANCELLED being propagated instead. + return testMode == TestMode.ServiceSecondOperatorThrowsGrpcException && + clientExecutionStrategy != null && clientExecutionStrategy.hasOffloads() ? + GrpcStatusCode.CANCELLED : expectedStatus(); } private GrpcStatusCode expectedStatus() { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java index 9e5e283175..7a44f1b6d0 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2PriorKnowledgeFeatureParityTest.java @@ -140,6 +140,7 @@ import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static io.servicetalk.data.jackson.JacksonSerializerFactory.JACKSON; import static io.servicetalk.http.api.HeaderUtils.isTransferEncodingChunked; import static io.servicetalk.http.api.HttpHeaderNames.CONNECTION; import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH; @@ -157,6 +158,7 @@ import static io.servicetalk.http.api.HttpResponseStatus.EXPECTATION_FAILED; import static io.servicetalk.http.api.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static io.servicetalk.http.api.HttpResponseStatus.OK; +import static io.servicetalk.http.api.HttpSerializers.jsonStreamingSerializer; import static io.servicetalk.http.api.HttpSerializers.textSerializerUtf8; import static io.servicetalk.http.netty.AbstractStreamingHttpConnection.MAX_CONCURRENCY_NO_OFFLOADING; import static io.servicetalk.http.netty.AsyncContextHttpFilterVerifier.K1; @@ -1306,6 +1308,24 @@ public Single request(final StreamingHttpRequest request) } } + @ParameterizedTest(name = "{displayName} [{index}] client={0}, h2PriorKnowledge={1}") + @MethodSource("clientExecutors") + void backpressureNoSOOEForLargePayloads(HttpTestExecutionStrategy strategy, boolean h2PriorKnowledge) + throws Exception { + setUp(strategy, h2PriorKnowledge); + InetSocketAddress serverAddress = bindHttpEchoServer(); + StreamingHttpClient client = forSingleAddress(HostAndPort.of(serverAddress)) + .protocols(h2PriorKnowledge ? h2Default() : h1Default()) + .executionStrategy(clientExecutionStrategy).buildStreaming(); + + StreamingHttpRequest request = client.post("/").payloadBody(Publisher.range(0, 10_000), + jsonStreamingSerializer(JACKSON.streamingSerializerDeserializer(Integer.class))); + StreamingHttpResponse response = client.request(request).toFuture().get(); + + response.messageBody().ignoreElements().toFuture().get(); + client.close(); + } + @ParameterizedTest(name = "{displayName} [{index}] client={0}, h2PriorKnowledge={1}") @MethodSource("clientExecutors") void serverGracefulClose(HttpTestExecutionStrategy strategy, boolean h2PriorKnowledge) throws Exception { diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java index c45f3a1537..fda950f38d 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java @@ -343,6 +343,12 @@ private static DefaultNettyConnection initChildChanne Predicate shouldWait, UnaryOperator enrichProtocolError) { assert parent == null || parent.executionContext() == executionContext; assert channel.eventLoop() == toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor()).eventLoopGroup(); + + // For h2 the parent channel must use auto read because control frames and flow controlled frames are on the + // same socket, and we must read in timely manner to avoid deadlock. Child channel should not use auto + // read as read is explicitly called by NettyChannelPublisher according to the Subscription.request(n) demand. + channel.config().setAutoRead(false); + DefaultNettyConnection connection = new DefaultNettyConnection<>(channel, parent, executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig, sslSession, parentChannelConfig, streamObserver.streamEstablished(), isClient, shouldWait, enrichProtocolError); diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java index bbd095bcaa..1976bf02c2 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelPublisher.java @@ -46,6 +46,10 @@ final class NettyChannelPublisher extends SubscribablePublisher { private boolean requested; @Nullable private SubscriptionImpl subscription; + /** + * The size of the queue is bound by {@link SubscriptionImpl#request(long)} demand. Using reactive operators to + * transform data and letting ServiceTalk subscribe will take care of backpressure automatically. + */ @Nullable private Queue pending; @Nullable