diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index a2c93d4ef8c..ea77f13892a 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -414,7 +414,7 @@ private void scheduleNextChunk() { // Schedule the next response chunk if there is one. Chunk nextChunk = chunks.peek(); - if (nextChunk != null) { + if (nextChunk != null && !executor.isShutdown()) { scheduled = true; // TODO(ejona): cancel future if RPC is cancelled Future unused = executor.schedule(new LogExceptionRunnable(dispatchTask), diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java index c86bd8070a0..c94e95704ac 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/NettyFlowControlTest.java @@ -147,10 +147,11 @@ private void doTest(int bandwidth, int latency) throws InterruptedException { // deal with cases that either don't cause a window update or hit max window expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW)); - // Range looks large, but this allows for only one extra/missed window update + // Range looks large, but this allows for only one extra/missed window update plus + // bdpPing variations. // (one extra update causes a 2x difference and one missed update causes a .5x difference) assertTrue("Window was " + lastWindow + " expecting " + expectedWindow, - lastWindow < 2 * expectedWindow); + lastWindow < 2.2 * expectedWindow); assertTrue("Window was " + lastWindow + " expecting " + expectedWindow, expectedWindow < 2 * lastWindow); } @@ -194,6 +195,7 @@ private static class TestStreamObserver implements StreamObserver grpcHandlerRef, long window) { @@ -206,9 +208,18 @@ public TestStreamObserver( public void onNext(StreamingOutputCallResponse value) { GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get(); Http2Stream connectionStream = grpcHandler.connection().connectionStream(); - lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream); - if (lastWindow >= expectedWindow) { - onCompleted(); + int curWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream); + synchronized (this) { + if (curWindow >= expectedWindow) { + if (wasCompleted) { + return; + } + wasCompleted = true; + lastWindow = curWindow; + onCompleted(); + } else if (!wasCompleted) { + lastWindow = curWindow; + } } } diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 80d36b34b02..e8595961613 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -218,13 +218,13 @@ public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) { public void onError(Throwable t) { logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t); response.setException(t); - throttler.registerBackendResponse(false); + throttler.registerBackendResponse(true); helper.propagateRlsError(); } @Override public void onCompleted() { - throttler.registerBackendResponse(true); + throttler.registerBackendResponse(false); } }); return response; diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index a6e7b7d80c7..120a486dec6 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -151,6 +151,7 @@ public void uncaughtException(Thread t, Throwable e) { private String rlsChannelOverriddenAuthority; private void setUpRlsLbClient() { + fakeThrottler.resetCounts(); rlsLbClient = CachingRlsLbClient.newBuilder() .setBackoffProvider(fakeBackoffProvider) @@ -362,6 +363,8 @@ public void get_updatesLbState() throws Exception { assertThat(pickResult.getStatus().isOk()).isTrue(); assertThat(pickResult.getSubchannel()).isNotNull(); assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value"); + assertThat(fakeThrottler.getNumThrottled()).isEqualTo(0); + assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1); // move backoff further back to only test error behavior fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS); @@ -388,6 +391,97 @@ public void get_updatesLbState() throws Exception { CallOptions.DEFAULT)); assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(pickResult.getStatus().getDescription()).contains("fallback not available"); + assertThat(fakeThrottler.getNumThrottled()).isEqualTo(1); + assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1); + } + + @Test + public void get_withAdaptiveThrottler() throws Exception { + AdaptiveThrottler adaptiveThrottler = + new AdaptiveThrottler.Builder() + .setHistorySeconds(1) + .setRatioForAccepts(1.0f) + .setRequestsPadding(1) + .setTicker(fakeClock.getTicker()) + .build(); + + this.rlsLbClient = + CachingRlsLbClient.newBuilder() + .setBackoffProvider(fakeBackoffProvider) + .setResolvedAddressesFactory(resolvedAddressFactory) + .setEvictionListener(evictionListener) + .setHelper(helper) + .setLbPolicyConfig(lbPolicyConfiguration) + .setThrottler(adaptiveThrottler) + .setTicker(fakeClock.getTicker()) + .build(); + InOrder inOrder = inOrder(helper); + RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "service1", "method-key", "create")); + rlsServerImpl.setLookupTable( + ImmutableMap.of( + routeLookupRequest, + RouteLookupResponse.create( + ImmutableList.of("primary.cloudbigtable.googleapis.com"), + "header-rls-data-value"))); + + // valid channel + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + assertThat(resp.isPending()).isTrue(); + fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + assertThat(resp.hasData()).isTrue(); + + ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class); + ArgumentCaptor stateCaptor = + ArgumentCaptor.forClass(ConnectivityState.class); + inOrder.verify(helper, times(2)) + .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); + + Metadata headers = new Metadata(); + PickResult pickResult = pickerCaptor.getValue().pickSubchannel( + new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod().toBuilder().setFullMethodName("service1/create") + .build(), + headers, + CallOptions.DEFAULT)); + assertThat(pickResult.getSubchannel()).isNotNull(); + assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value"); + + // move backoff further back to only test error behavior + fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS); + // try to get invalid + RouteLookupRequest invalidRouteLookupRequest = + RouteLookupRequest.create(ImmutableMap.of()); + CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest); + assertThat(errorResp.isPending()).isTrue(); + fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + errorResp = getInSyncContext(invalidRouteLookupRequest); + assertThat(errorResp.hasError()).isTrue(); + + // Channel is still READY because the subchannel for method /service1/create is still READY. + // Method /doesn/exists will use fallback child balancer and fail immediately. + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + PickSubchannelArgsImpl invalidArgs = getInvalidArgs(headers); + pickResult = pickerCaptor.getValue().pickSubchannel(invalidArgs); + assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(pickResult.getStatus().getDescription()).contains("fallback not available"); + long time = fakeClock.getTicker().read(); + assertThat(adaptiveThrottler.requestStat.get(time)).isEqualTo(2L); + assertThat(adaptiveThrottler.throttledStat.get(time)).isEqualTo(1L); + } + + private PickSubchannelArgsImpl getInvalidArgs(Metadata headers) { + PickSubchannelArgsImpl invalidArgs = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod().toBuilder() + .setFullMethodName("doesn/exists") + .build(), + headers, + CallOptions.DEFAULT); + return invalidArgs; } @Test @@ -755,6 +849,8 @@ public ChannelLogger getChannelLogger() { } private static final class FakeThrottler implements Throttler { + int numUnthrottled; + int numThrottled; private boolean nextResult = false; @@ -765,7 +861,24 @@ public boolean shouldThrottle() { @Override public void registerBackendResponse(boolean throttled) { - // no-op + if (throttled) { + numThrottled++; + } else { + numUnthrottled++; + } + } + + public int getNumUnthrottled() { + return numUnthrottled; + } + + public int getNumThrottled() { + return numThrottled; + } + + public void resetCounts() { + numThrottled = 0; + numUnthrottled = 0; } } }