Skip to content

Commit

Permalink
Detect transport executors with no remaining threads (#11503)
Browse files Browse the repository at this point in the history
Detect misconfigured transport executors with too few threads that could further throttle the transport.

Fixes #11271
  • Loading branch information
shivaspeaks authored Sep 16, 2024
1 parent b8c1aa5 commit 3a6be9c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
33 changes: 33 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -499,8 +503,15 @@ public Runnable start(Listener listener) {
outboundFlow = new OutboundFlowController(this, frameWriter);
}
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latchForExtraThread = new CountDownLatch(1);
// The transport needs up to two threads to function once started,
// but only needs one during handshaking. Start another thread during handshaking
// to make sure there's still a free thread available. If the number of threads is exhausted,
// it is better to kill the transport than for all the transports to hang unable to send.
CyclicBarrier barrier = new CyclicBarrier(2);
// Connecting in the serializingExecutor, so that some stream operations like synStream
// will be executed after connected.

serializingExecutor.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -510,8 +521,14 @@ public void run() {
// initial preface.
try {
latch.await();
barrier.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException | BrokenBarrierException e) {
startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
.withDescription("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads"));
return;
}
// Use closed source on failure so that the reader immediately shuts down.
BufferedSource source = Okio.buffer(new Source() {
Expand Down Expand Up @@ -575,6 +592,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
return;
} finally {
clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
latchForExtraThread.countDown();
}
synchronized (lock) {
socket = Preconditions.checkNotNull(sock, "socket");
Expand All @@ -584,6 +602,21 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
}
}
});

executor.execute(new Runnable() {
@Override
public void run() {
try {
barrier.await(1000, TimeUnit.MILLISECONDS);
latchForExtraThread.await();
} catch (BrokenBarrierException | TimeoutException e) {
// Something bad happened, maybe too few threads available!
// This will be handled in the handshake thread.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// Schedule to send connection preface & settings before any other write.
try {
sendConnectionPrefaceAndSettings();
Expand Down
22 changes: 22 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,28 @@ public void testToString() throws Exception {
assertTrue("Unexpected: " + s, s.contains(address.toString()));
}

@Test
public void testTransportExecutorWithTooFewThreads() throws Exception {
ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1);
channelBuilder.transportExecutor(fixedPoolExecutor);
InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
clientTransport = new OkHttpClientTransport(
channelBuilder.buildTransportFactory(),
address,
"hostname",
null,
EAG_ATTRS,
NO_PROXY,
tooManyPingsRunnable);
clientTransport.start(transportListener);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture());
Status capturedStatus = statusCaptor.getValue();
assertEquals("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads",
capturedStatus.getDescription());
}

/**
* Test logging is functioning correctly for client received Http/2 frames. Not intended to test
* actual frame content being logged.
Expand Down

0 comments on commit 3a6be9c

Please sign in to comment.