Skip to content

Commit

Permalink
xds/resolver: cleanup tests to use real xDS client 2/n (#5952)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Jan 24, 2023
1 parent 52a8392 commit 4adb2a7
Showing 1 changed file with 105 additions and 21 deletions.
126 changes: 105 additions & 21 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,34 +447,118 @@ func (s) TestResolverResourceName(t *testing.T) {
}
}

// TestXDSResolverWatchCallbackAfterClose tests the case where a service update
// from the underlying xdsClient is received after the resolver is closed.
func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer cancel()
// TestResolverWatchCallbackAfterClose tests the case where a service update
// from the underlying xDS client is received after the resolver is closed, and
// verifies that the update is not propagated to the ClientConn.
func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
// Setup the management server that synchronizes with the test goroutine
// using two channels. The management server signals the test goroutine when
// it receives a discovery request for a route configuration resource. And
// the test goroutine signals the management server when the resolver is
// closed.
waitForRouteConfigDiscoveryReqCh := make(chan struct{})
waitForResolverCloseCh := make(chan struct{})
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() == version.V3RouteConfigURL {
close(waitForRouteConfigDiscoveryReqCh)
<-waitForResolverCloseCh
}
return nil
},
})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
cdsName := "cluster-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Call the watchAPI callback after closing the resolver, and make sure no
// update is triggerred on the ClientConn.
xdsR.Close()
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}},
},
},
}, nil)
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()

// Wait for a discovery request for a route configuration resource.
select {
case <-waitForRouteConfigDiscoveryReqCh:
case <-ctx.Done():
t.Fatal("Timeout when waiting for a discovery request for a route configuration resource")
}

// Close the resolver and unblock the management server.
rClose()
close(waitForResolverCloseCh)

// Verify that the update from the management server is not propagated to
// the ClientConn. The xDS resolver, once closed, is expected to drop
// updates from the xDS client.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if gotVal, gotErr := tcc.stateCh.Receive(sCtx); gotErr != context.DeadlineExceeded {
t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal)
if _, err := tcc.stateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("ClientConn received an update from the resolver that was closed: %v", err)
}
}

// TestResolverCloseClosesXDSClient tests that the xDS resolver's Close method
// closes the xDS client.
func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
bootstrapCfg := &bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy-management-server-address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV3,
},
}

// Override xDS client creation to use bootstrap configuration pointing to a
// dummy management server. Also close a channel when the returned xDS
// client is closed.
closeCh := make(chan struct{})
origNewClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout)
return c, func() {
close(closeCh)
cancel()
}, err
}
defer func() {
newXDSClient = origNewClient
}()

_, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")})
rClose()

select {
case <-closeCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for xDS client to be closed")
}
}

Expand Down

0 comments on commit 4adb2a7

Please sign in to comment.