Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls:Fix throttling in route lookup (b/262779100) #9874

Merged
merged 12 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ private void scheduleNextChunk() {

// Schedule the next response chunk if there is one.
Chunk nextChunk = chunks.peek();
if (nextChunk != null) {
if (nextChunk != null && !executor.isShutdown()) {
scheduled = true;
// TODO(ejona): cancel future if RPC is cancelled
Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ private void doTest(int bandwidth, int latency) throws InterruptedException {
// deal with cases that either don't cause a window update or hit max window
expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));

// Range looks large, but this allows for only one extra/missed window update
// Range looks large, but this allows for only one extra/missed window update plus
// bdpPing variations.
// (one extra update causes a 2x difference and one missed update causes a .5x difference)
assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
lastWindow < 2 * expectedWindow);
lastWindow < 2.2 * expectedWindow);
assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
expectedWindow < 2 * lastWindow);
}
Expand Down Expand Up @@ -194,6 +195,7 @@ private static class TestStreamObserver implements StreamObserver<StreamingOutpu
final CountDownLatch latch = new CountDownLatch(1);
final long expectedWindow;
int lastWindow;
boolean wasCompleted;

public TestStreamObserver(
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
Expand All @@ -206,9 +208,18 @@ public TestStreamObserver(
public void onNext(StreamingOutputCallResponse value) {
GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
Http2Stream connectionStream = grpcHandler.connection().connectionStream();
lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
if (lastWindow >= expectedWindow) {
onCompleted();
int curWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
synchronized (this) {
if (curWindow >= expectedWindow) {
if (wasCompleted) {
return;
}
wasCompleted = true;
lastWindow = curWindow;
onCompleted();
} else if (!wasCompleted) {
lastWindow = curWindow;
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
public void onError(Throwable t) {
logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
response.setException(t);
throttler.registerBackendResponse(false);
throttler.registerBackendResponse(true);
helper.propagateRlsError();
}

@Override
public void onCompleted() {
throttler.registerBackendResponse(true);
throttler.registerBackendResponse(false);
}
});
return response;
Expand Down
115 changes: 114 additions & 1 deletion rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void uncaughtException(Thread t, Throwable e) {
private String rlsChannelOverriddenAuthority;

private void setUpRlsLbClient() {
fakeThrottler.resetCounts();
rlsLbClient =
CachingRlsLbClient.newBuilder()
.setBackoffProvider(fakeBackoffProvider)
Expand Down Expand Up @@ -362,6 +363,8 @@ public void get_updatesLbState() throws Exception {
assertThat(pickResult.getStatus().isOk()).isTrue();
assertThat(pickResult.getSubchannel()).isNotNull();
assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value");
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(0);
assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1);

// move backoff further back to only test error behavior
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
Expand All @@ -388,6 +391,97 @@ public void get_updatesLbState() throws Exception {
CallOptions.DEFAULT));
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(1);
assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1);
}

@Test
public void get_withAdaptiveThrottler() throws Exception {
AdaptiveThrottler adaptiveThrottler =
new AdaptiveThrottler.Builder()
.setHistorySeconds(1)
.setRatioForAccepts(1.0f)
.setRequestsPadding(1)
.setTicker(fakeClock.getTicker())
.build();

this.rlsLbClient =
CachingRlsLbClient.newBuilder()
.setBackoffProvider(fakeBackoffProvider)
.setResolvedAddressesFactory(resolvedAddressFactory)
.setEvictionListener(evictionListener)
.setHelper(helper)
.setLbPolicyConfig(lbPolicyConfiguration)
.setThrottler(adaptiveThrottler)
.setTicker(fakeClock.getTicker())
.build();
InOrder inOrder = inOrder(helper);
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
"server", "bigtable.googleapis.com", "service-key", "service1", "method-key", "create"));
rlsServerImpl.setLookupTable(
ImmutableMap.of(
routeLookupRequest,
RouteLookupResponse.create(
ImmutableList.of("primary.cloudbigtable.googleapis.com"),
"header-rls-data-value")));

// valid channel
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
assertThat(resp.isPending()).isTrue();
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);

resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();

ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
ArgumentCaptor<ConnectivityState> stateCaptor =
ArgumentCaptor.forClass(ConnectivityState.class);
inOrder.verify(helper, times(2))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());

Metadata headers = new Metadata();
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(
new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod().toBuilder().setFullMethodName("service1/create")
.build(),
headers,
CallOptions.DEFAULT));
assertThat(pickResult.getSubchannel()).isNotNull();
assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value");

// move backoff further back to only test error behavior
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
// try to get invalid
RouteLookupRequest invalidRouteLookupRequest =
RouteLookupRequest.create(ImmutableMap.<String, String>of());
CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest);
assertThat(errorResp.isPending()).isTrue();
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);

errorResp = getInSyncContext(invalidRouteLookupRequest);
assertThat(errorResp.hasError()).isTrue();

// Channel is still READY because the subchannel for method /service1/create is still READY.
// Method /doesn/exists will use fallback child balancer and fail immediately.
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
PickSubchannelArgsImpl invalidArgs = getInvalidArgs(headers);
pickResult = pickerCaptor.getValue().pickSubchannel(invalidArgs);
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
long time = fakeClock.getTicker().read();
assertThat(adaptiveThrottler.requestStat.get(time)).isEqualTo(2L);
assertThat(adaptiveThrottler.throttledStat.get(time)).isEqualTo(1L);
}

private PickSubchannelArgsImpl getInvalidArgs(Metadata headers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor style nit: Methods starting with get usually return something that already exists. I think a prefix of new (or create, etc.) is better suited for methods that create new objects.

PickSubchannelArgsImpl invalidArgs = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod().toBuilder()
.setFullMethodName("doesn/exists")
.build(),
headers,
CallOptions.DEFAULT);
return invalidArgs;
}

@Test
Expand Down Expand Up @@ -755,6 +849,8 @@ public ChannelLogger getChannelLogger() {
}

private static final class FakeThrottler implements Throttler {
int numUnthrottled;
int numThrottled;

private boolean nextResult = false;

Expand All @@ -765,7 +861,24 @@ public boolean shouldThrottle() {

@Override
public void registerBackendResponse(boolean throttled) {
// no-op
if (throttled) {
numThrottled++;
} else {
numUnthrottled++;
}
}

public int getNumUnthrottled() {
return numUnthrottled;
}

public int getNumThrottled() {
return numThrottled;
}

public void resetCounts() {
numThrottled = 0;
numUnthrottled = 0;
}
}
}