Skip to content

Commit

Permalink
xds: Update logic so that an error being reported when stream is clos…
Browse files Browse the repository at this point in the history
…ed gets propagated to subscribers (#9827)

* Stop setting waitForReady in XdsClient's AbstractXdsClient.
* Handle bad URL cleanly.  

Fix test cases to deal with asynchronous flow.
  • Loading branch information
larry-safran committed Jan 25, 2023
1 parent b0635fa commit 501ca8f
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 12 deletions.
23 changes: 17 additions & 6 deletions xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
* the xDS RPC stream.
*/
final class AbstractXdsClient {

public static final String CLOSED_BY_SERVER = "Closed by server";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
Expand Down Expand Up @@ -217,6 +219,11 @@ void readyHandler() {
return;
}

if (isInBackoff()) {
rpcRetryTimer.cancel();
rpcRetryTimer = null;
}

timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
}

Expand Down Expand Up @@ -315,21 +322,25 @@ final void handleRpcError(Throwable t) {
}

final void handleRpcCompleted() {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
}

private void handleRpcStreamClosed(Status error) {
checkArgument(!error.isOk(), "unexpected OK status");
if (closed) {
return;
}

checkArgument(!error.isOk(), "unexpected OK status");
String errorMsg = error.getDescription() != null
&& error.getDescription().equals(CLOSED_BY_SERVER)
? "ADS stream closed with status {0}: {1}. Cause: {2}"
: "ADS stream failed with status {0}: {1}. Cause: {2}";
logger.log(
XdsLogLevel.ERROR,
"ADS stream closed with status {0}: {1}. Cause: {2}",
error.getCode(), error.getDescription(), error.getCause());
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
closed = true;
xdsResponseHandler.handleStreamClosed(error);
cleanUp();

if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
// has never been initialized.
Expand Down Expand Up @@ -423,7 +434,7 @@ public void run() {
});
}
};
requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader);
requestWriter = stub.streamAggregatedResources(responseReader);
}

@Override
Expand Down
17 changes: 14 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,22 @@ private final class ResourceSubscriber<T extends ResourceUpdate> {
// Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
// is created but not yet requested because the client is in backoff.
this.metadata = ResourceMetadata.newResourceMetadataUnknown();
maybeCreateXdsChannelWithLrs(serverInfo);
this.xdsChannel = serverChannelMap.get(serverInfo);
if (xdsChannel.isInBackoff()) {

AbstractXdsClient xdsChannelTemp = null;
try {
maybeCreateXdsChannelWithLrs(serverInfo);
xdsChannelTemp = serverChannelMap.get(serverInfo);
if (xdsChannelTemp.isInBackoff()) {
return;
}
} catch (IllegalArgumentException e) {
xdsChannelTemp = null;
this.errorDescription = "Bad configuration: " + e.getMessage();
return;
} finally {
this.xdsChannel = xdsChannelTemp;
}

restartTimer();
}

Expand Down
76 changes: 73 additions & 3 deletions xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -70,6 +71,7 @@
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
Expand Down Expand Up @@ -114,6 +116,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
Expand Down Expand Up @@ -3226,7 +3229,8 @@ public void streamClosedAndRetryWithBackoff() {

// Management server closes the RPC stream with an error.
call.sendError(Status.UNKNOWN.asException());
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
Expand Down Expand Up @@ -3336,7 +3340,8 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() {
RDS_RESOURCE, rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
Expand Down Expand Up @@ -3573,13 +3578,18 @@ public void sendingToStoppedServer() throws Exception {
.build()
.start());
fakeClock.forwardTime(5, TimeUnit.SECONDS);
verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
fakeClock.forwardTime(20, TimeUnit.SECONDS); // Trigger rpcRetryTimer
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
if (call == null) { // The first rpcRetry may have happened before the channel was ready
fakeClock.forwardTime(50, TimeUnit.SECONDS);
call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
}

// NOTE: There is a ScheduledExecutorService that may get involved due to the reconnect
// so you cannot rely on the logic being single threaded. The timeout() in verifyRequest
// is therefore necessary to avoid flakiness.
// Send a response and do verifications
verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
Expand All @@ -3592,6 +3602,66 @@ public void sendingToStoppedServer() throws Exception {
}
}

@Test
public void sendToBadUrl() throws Exception {
// Setup xdsClient to fail on stream creation
XdsClientImpl client = createXdsClient("some. garbage");

client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
fakeClock.forwardTime(20, TimeUnit.SECONDS);
verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
client.shutdown();
}

@Test
public void sendToNonexistentHost() throws Exception {
// Setup xdsClient to fail on stream creation
XdsClientImpl client = createXdsClient("some.garbage");
client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
fakeClock.forwardTime(20, TimeUnit.SECONDS);

verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
fakeClock.forwardTime(50, TimeUnit.SECONDS); // Trigger rpcRetry if appropriate
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
client.shutdown();
}

private XdsClientImpl createXdsClient(String serverUri) {
BootstrapInfo bootstrapInfo = buildBootStrap(serverUri);
return new XdsClientImpl(
DEFAULT_XDS_CHANNEL_FACTORY,
bootstrapInfo,
Context.ROOT,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
tlsContextManager);
}

private BootstrapInfo buildBootStrap(String serverUri) {

ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
ignoreResourceDeletion());

return Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
.node(NODE)
.authorities(ImmutableMap.of(
"authority.xds.com",
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
.certProviders(ImmutableMap.of("cert-instance-name",
CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
.build();
}

private <T extends ResourceUpdate> DiscoveryRpcCall startResourceWatcher(
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {
Expand Down

0 comments on commit 501ca8f

Please sign in to comment.