Skip to content

Commit

Permalink
cli: wait for connection cleanup
Browse files Browse the repository at this point in the history
I refactored this to rely solely on context cancellation in
a previous commit. Unfortunately, we run cli tools multiple
times in the same process during tests, and we reset globals
that are picked up by the RPCContext used by open connections,
which results in data races.

Return a closure from getClientGRPCConn which waits for the
Stopper to stop to circumvent this.
  • Loading branch information
tbg committed Nov 30, 2017
1 parent 36e49db commit da5dca5
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 29 deletions.
3 changes: 2 additions & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,10 +809,11 @@ func runDebugGossipValues(cmd *cobra.Command, args []string) error {
return errors.Wrap(err, "failed to parse provided file as gossip.InfoStatus")
}
} else {
conn, _, err := getClientGRPCConn(ctx)
conn, _, finish, err := getClientGRPCConn(ctx)
if err != nil {
return err
}
defer finish()

status := serverpb.NewStatusClient(conn)
gossipInfo, err = status.Gossip(ctx, &serverpb.GossipRequest{})
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func runGenHAProxyCmd(cmd *cobra.Command, args []string) error {
return err
}

conn, _, err := getClientGRPCConn(ctx)
conn, _, finish, err := getClientGRPCConn(ctx)
if err != nil {
return err
}
defer finish()
c := serverpb.NewStatusClient(conn)

nodeStatuses, err := c.Nodes(ctx, &serverpb.NodesRequest{})
Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ func runInit(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conn, _, err := getClientGRPCConn(ctx)
conn, _, finish, err := getClientGRPCConn(ctx)
if err != nil {
return err
}
defer finish()

c := serverpb.NewInitClient(conn)

if _, err = c.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@ func runStatusNodeInner(

var nodeStatuses []status.NodeStatus

conn, _, err := getClientGRPCConn(ctx)
conn, _, finish, err := getClientGRPCConn(ctx)
if err != nil {
return nil, nil, err
}
defer finish()

c := serverpb.NewStatusClient(conn)

var decommissionStatusRequest *serverpb.DecommissionStatusRequest
Expand Down Expand Up @@ -180,10 +182,11 @@ func runStatusNodeInner(
return nil, nil, errors.Errorf("expected no arguments or a single node ID")
}

cAdmin, err := getAdminClient(ctx)
cAdmin, finish, err := getAdminClient(ctx)
if err != nil {
return nil, nil, err
}
defer finish()

decommissionStatusResp, err := cAdmin.DecommissionStatus(ctx, decommissionStatusRequest)
if err != nil {
Expand Down Expand Up @@ -320,10 +323,11 @@ func runDecommissionNode(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c, err := getAdminClient(ctx)
c, finish, err := getAdminClient(ctx)
if err != nil {
return err
}
defer finish()

return runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait, args)
}
Expand Down Expand Up @@ -440,10 +444,11 @@ func runRecommissionNode(cmd *cobra.Command, args []string) error {
return err
}

c, err := getAdminClient(ctx)
c, finish, err := getAdminClient(ctx)
if err != nil {
return err
}
defer finish()

req := &serverpb.DecommissionRequest{
NodeIDs: nodeIDs,
Expand Down
16 changes: 10 additions & 6 deletions pkg/cli/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (
)

// MakeDBClient creates a kv client for use in cli tools.
func MakeDBClient(ctx context.Context) (*client.DB, error) {
// Invoking the returned closure closes the underlying connection and waits
// until the associated goroutines have terminated.
func MakeDBClient(ctx context.Context) (*client.DB, func(), error) {
// The KV endpoints require the node user.
baseCfg.User = security.NodeUser
conn, clock, err := getClientGRPCConn(ctx)
conn, clock, finish, err := getClientGRPCConn(ctx)
if err != nil {
return nil, err
return nil, nil, err
}
return client.NewDB(client.NewSender(conn), clock), nil
return client.NewDB(client.NewSender(conn), clock), finish, nil
}

// A lsRangesCmd command lists the ranges in a cluster.
Expand Down Expand Up @@ -70,10 +72,11 @@ func runLsRanges(cmd *cobra.Command, args []string) error {
}
endKey := keys.Meta2Prefix.PrefixEnd()

kvDB, err := MakeDBClient(ctx)
kvDB, finish, err := MakeDBClient(ctx)
if err != nil {
return err
}
defer finish()

rows, err := kvDB.Scan(ctx, startKey, endKey, maxResults)
if err != nil {
Expand Down Expand Up @@ -115,10 +118,11 @@ func runSplitRange(cmd *cobra.Command, args []string) error {

key := roachpb.Key(args[0])

kvDB, err := MakeDBClient(ctx)
kvDB, finish, err := MakeDBClient(ctx)
if err != nil {
return err
}
defer finish()
return errors.Wrap(kvDB.AdminSplit(ctx, key, key), "split failed")
}

Expand Down
33 changes: 18 additions & 15 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,9 +922,11 @@ func addrWithDefaultHost(addr string) (string, error) {
return net.JoinHostPort(host, port), nil
}

func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, error) {
// getClientGRPCConn returns a ClientConn, a Clock and a method that blocks
// until the connection (and its associated goroutines) have terminated.
func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, func(), error) {
if ctx.Done() == nil {
return nil, nil, errors.New("context must be cancellable")
return nil, nil, nil, errors.New("context must be cancellable")
}
// 0 to disable max offset checks; this RPC context is not a member of the
// cluster, so there's no need to enforce that its max offset is the same
Expand All @@ -939,31 +941,31 @@ func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, error
)
addr, err := addrWithDefaultHost(serverCfg.AdvertiseAddr)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
conn, err := rpcContext.GRPCDial(addr)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
stopper.AddCloser(stop.CloserFn(func() {
_ = conn.Close()
}))

// Tie the lifetime of the stopper to that of the context.
go func() {
<-ctx.Done()
// Don't use the `ctx` as it's already cancelled.
stopper.Stop(context.Background())
}()
return conn, clock, nil
closer := func() {
stopper.Stop(ctx)
}
return conn, clock, closer, nil
}

func getAdminClient(ctx context.Context) (serverpb.AdminClient, error) {
conn, _, err := getClientGRPCConn(ctx)
// getAdminClient returns an AdminClient and a closure that must be invoked
// to free associated resources.
func getAdminClient(ctx context.Context) (serverpb.AdminClient, func(), error) {
conn, _, finish, err := getClientGRPCConn(ctx)
if err != nil {
return nil, err
return nil, nil, err
}
return serverpb.NewAdminClient(conn), nil
return serverpb.NewAdminClient(conn), finish, nil
}

// quitCmd command shuts down the node server.
Expand Down Expand Up @@ -1064,10 +1066,11 @@ func runQuit(cmd *cobra.Command, args []string) (err error) {
onModes[i] = int32(m)
}

c, err := getAdminClient(ctx)
c, finish, err := getAdminClient(ctx)
if err != nil {
return err
}
defer finish()

if quitCtx.serverDecommission {
var myself []string // will remain empty, which means target yourself
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func runDebugZip(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conn, _, err := getClientGRPCConn(ctx)
conn, _, finish, err := getClientGRPCConn(ctx)
if err != nil {
return err
}
defer finish()

status := serverpb.NewStatusClient(conn)
admin := serverpb.NewAdminClient(conn)
Expand Down

0 comments on commit da5dca5

Please sign in to comment.