From da5dca5a528245a20f4255105cbb34c816876ba7 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 29 Nov 2017 22:38:40 -0500 Subject: [PATCH] cli: wait for connection cleanup I refactored this to rely solely on context cancellation in a previous commit. Unfortunately, we run cli tools multiple times in the same process during tests, and we reset globals that are picked up by the RPCContext used by open connections, which results in data races. Return a closure from getClientGRPCConn which waits for the Stopper to stop to circumvent this. --- pkg/cli/debug.go | 3 ++- pkg/cli/haproxy.go | 3 ++- pkg/cli/init.go | 4 +++- pkg/cli/node.go | 13 +++++++++---- pkg/cli/range.go | 16 ++++++++++------ pkg/cli/start.go | 33 ++++++++++++++++++--------------- pkg/cli/zip.go | 3 ++- 7 files changed, 46 insertions(+), 29 deletions(-) diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index eff434f9f03c..19e16dad2405 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -809,10 +809,11 @@ func runDebugGossipValues(cmd *cobra.Command, args []string) error { return errors.Wrap(err, "failed to parse provided file as gossip.InfoStatus") } } else { - conn, _, err := getClientGRPCConn(ctx) + conn, _, finish, err := getClientGRPCConn(ctx) if err != nil { return err } + defer finish() status := serverpb.NewStatusClient(conn) gossipInfo, err = status.Gossip(ctx, &serverpb.GossipRequest{}) diff --git a/pkg/cli/haproxy.go b/pkg/cli/haproxy.go index b65150c5a036..6c36f4ed5c5f 100644 --- a/pkg/cli/haproxy.go +++ b/pkg/cli/haproxy.go @@ -54,10 +54,11 @@ func runGenHAProxyCmd(cmd *cobra.Command, args []string) error { return err } - conn, _, err := getClientGRPCConn(ctx) + conn, _, finish, err := getClientGRPCConn(ctx) if err != nil { return err } + defer finish() c := serverpb.NewStatusClient(conn) nodeStatuses, err := c.Nodes(ctx, &serverpb.NodesRequest{}) diff --git a/pkg/cli/init.go b/pkg/cli/init.go index badbe37b6ac0..97363e972e5a 100644 --- a/pkg/cli/init.go +++ b/pkg/cli/init.go @@ -46,10 +46,12 @@ func runInit(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - conn, _, err := getClientGRPCConn(ctx) + conn, _, finish, err := getClientGRPCConn(ctx) if err != nil { return err } + defer finish() + c := serverpb.NewInitClient(conn) if _, err = c.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { diff --git a/pkg/cli/node.go b/pkg/cli/node.go index 23f46249225e..0e2c0cc753ba 100644 --- a/pkg/cli/node.go +++ b/pkg/cli/node.go @@ -134,10 +134,12 @@ func runStatusNodeInner( var nodeStatuses []status.NodeStatus - conn, _, err := getClientGRPCConn(ctx) + conn, _, finish, err := getClientGRPCConn(ctx) if err != nil { return nil, nil, err } + defer finish() + c := serverpb.NewStatusClient(conn) var decommissionStatusRequest *serverpb.DecommissionStatusRequest @@ -180,10 +182,11 @@ func runStatusNodeInner( return nil, nil, errors.Errorf("expected no arguments or a single node ID") } - cAdmin, err := getAdminClient(ctx) + cAdmin, finish, err := getAdminClient(ctx) if err != nil { return nil, nil, err } + defer finish() decommissionStatusResp, err := cAdmin.DecommissionStatus(ctx, decommissionStatusRequest) if err != nil { @@ -320,10 +323,11 @@ func runDecommissionNode(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := getAdminClient(ctx) + c, finish, err := getAdminClient(ctx) if err != nil { return err } + defer finish() return runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait, args) } @@ -440,10 +444,11 @@ func runRecommissionNode(cmd *cobra.Command, args []string) error { return err } - c, err := getAdminClient(ctx) + c, finish, err := getAdminClient(ctx) if err != nil { return err } + defer finish() req := &serverpb.DecommissionRequest{ NodeIDs: nodeIDs, diff --git a/pkg/cli/range.go b/pkg/cli/range.go index 5bd649993608..f6aca9f90d3f 100644 --- a/pkg/cli/range.go +++ b/pkg/cli/range.go @@ -28,14 +28,16 @@ import ( ) // MakeDBClient creates a kv client for use in cli tools. -func MakeDBClient(ctx context.Context) (*client.DB, error) { +// Invoking the returned closure closes the underlying connection and waits +// until the associated goroutines have terminated. +func MakeDBClient(ctx context.Context) (*client.DB, func(), error) { // The KV endpoints require the node user. baseCfg.User = security.NodeUser - conn, clock, err := getClientGRPCConn(ctx) + conn, clock, finish, err := getClientGRPCConn(ctx) if err != nil { - return nil, err + return nil, nil, err } - return client.NewDB(client.NewSender(conn), clock), nil + return client.NewDB(client.NewSender(conn), clock), finish, nil } // A lsRangesCmd command lists the ranges in a cluster. @@ -70,10 +72,11 @@ func runLsRanges(cmd *cobra.Command, args []string) error { } endKey := keys.Meta2Prefix.PrefixEnd() - kvDB, err := MakeDBClient(ctx) + kvDB, finish, err := MakeDBClient(ctx) if err != nil { return err } + defer finish() rows, err := kvDB.Scan(ctx, startKey, endKey, maxResults) if err != nil { @@ -115,10 +118,11 @@ func runSplitRange(cmd *cobra.Command, args []string) error { key := roachpb.Key(args[0]) - kvDB, err := MakeDBClient(ctx) + kvDB, finish, err := MakeDBClient(ctx) if err != nil { return err } + defer finish() return errors.Wrap(kvDB.AdminSplit(ctx, key, key), "split failed") } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index 6c9b225e5224..c2e675639e14 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -922,9 +922,11 @@ func addrWithDefaultHost(addr string) (string, error) { return net.JoinHostPort(host, port), nil } -func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, error) { +// getClientGRPCConn returns a ClientConn, a Clock and a method that blocks +// until the connection (and its associated goroutines) have terminated. +func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, func(), error) { if ctx.Done() == nil { - return nil, nil, errors.New("context must be cancellable") + return nil, nil, nil, errors.New("context must be cancellable") } // 0 to disable max offset checks; this RPC context is not a member of the // cluster, so there's no need to enforce that its max offset is the same @@ -939,31 +941,31 @@ func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, error ) addr, err := addrWithDefaultHost(serverCfg.AdvertiseAddr) if err != nil { - return nil, nil, err + return nil, nil, nil, err } conn, err := rpcContext.GRPCDial(addr) if err != nil { - return nil, nil, err + return nil, nil, nil, err } stopper.AddCloser(stop.CloserFn(func() { _ = conn.Close() })) // Tie the lifetime of the stopper to that of the context. - go func() { - <-ctx.Done() - // Don't use the `ctx` as it's already cancelled. - stopper.Stop(context.Background()) - }() - return conn, clock, nil + closer := func() { + stopper.Stop(ctx) + } + return conn, clock, closer, nil } -func getAdminClient(ctx context.Context) (serverpb.AdminClient, error) { - conn, _, err := getClientGRPCConn(ctx) +// getAdminClient returns an AdminClient and a closure that must be invoked +// to free associated resources. +func getAdminClient(ctx context.Context) (serverpb.AdminClient, func(), error) { + conn, _, finish, err := getClientGRPCConn(ctx) if err != nil { - return nil, err + return nil, nil, err } - return serverpb.NewAdminClient(conn), nil + return serverpb.NewAdminClient(conn), finish, nil } // quitCmd command shuts down the node server. @@ -1064,10 +1066,11 @@ func runQuit(cmd *cobra.Command, args []string) (err error) { onModes[i] = int32(m) } - c, err := getAdminClient(ctx) + c, finish, err := getAdminClient(ctx) if err != nil { return err } + defer finish() if quitCtx.serverDecommission { var myself []string // will remain empty, which means target yourself diff --git a/pkg/cli/zip.go b/pkg/cli/zip.go index 891e8fe7ad35..4e7926427fed 100644 --- a/pkg/cli/zip.go +++ b/pkg/cli/zip.go @@ -112,10 +112,11 @@ func runDebugZip(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - conn, _, err := getClientGRPCConn(ctx) + conn, _, finish, err := getClientGRPCConn(ctx) if err != nil { return err } + defer finish() status := serverpb.NewStatusClient(conn) admin := serverpb.NewAdminClient(conn)