Skip to content

Commit

Permalink
Merge branch 'master' into fix_cdc_compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Feb 2, 2023
2 parents 33dd26e + 35416c4 commit 7423502
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 13 deletions.
113 changes: 106 additions & 7 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,119 @@ type connArray struct {
target string

index uint32
v []*grpc.ClientConn
v []*monitoredConn
// streamTimeout binds with a background goroutine to process coprocessor streaming timeout.
streamTimeout chan *tikvrpc.Lease
dialTimeout time.Duration
// batchConn is not null when batch is enabled.
*batchConn
done chan struct{}

monitor *connMonitor
}

func newConnArray(maxSize uint, addr string, security config.Security,
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, opts []grpc.DialOption) (*connArray, error) {
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, opts []grpc.DialOption) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
v: make([]*monitoredConn, maxSize),
streamTimeout: make(chan *tikvrpc.Lease, 1024),
done: make(chan struct{}),
dialTimeout: dialTimeout,
monitor: m,
}
if err := a.Init(addr, security, idleNotify, enableBatch, opts...); err != nil {
return nil, err
}
return a, nil
}

type connMonitor struct {
m sync.Map
loopOnce sync.Once
stopOnce sync.Once
stop chan struct{}
}

func (c *connMonitor) AddConn(conn *monitoredConn) {
c.m.Store(conn.Name, conn)
}

func (c *connMonitor) RemoveConn(conn *monitoredConn) {
c.m.Delete(conn.Name)
for state := connectivity.Idle; state <= connectivity.Shutdown; state++ {
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), state.String()).Set(0)
}
}

func (c *connMonitor) Start() {
c.loopOnce.Do(
func() {
c.stop = make(chan struct{})
go c.start()
},
)
}

func (c *connMonitor) Stop() {
c.stopOnce.Do(
func() {
if c.stop != nil {
close(c.stop)
}
},
)
}

func (c *connMonitor) start() {

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.m.Range(func(_, value interface{}) bool {
conn := value.(*monitoredConn)
nowState := conn.GetState()
for state := connectivity.Idle; state <= connectivity.Shutdown; state++ {
if state == nowState {
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), nowState.String()).Set(1)
} else {
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), state.String()).Set(0)
}
}
return true
})
case <-c.stop:
return
}
}
}

type monitoredConn struct {
*grpc.ClientConn
Name string
}

func (a *connArray) monitoredDial(ctx context.Context, connName, target string, opts ...grpc.DialOption) (conn *monitoredConn, err error) {
conn = &monitoredConn{
Name: connName,
}
conn.ClientConn, err = grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}
a.monitor.AddConn(conn)
return conn, nil
}

func (c *monitoredConn) Close() error {
if c.ClientConn != nil {
return c.ClientConn.Close()
}
return nil
}

func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, opts ...grpc.DialOption) error {
a.target = addr

Expand Down Expand Up @@ -198,11 +287,13 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
}),
}, opts...)

conn, err := grpc.DialContext(
conn, err := a.monitoredDial(
ctx,
fmt.Sprintf("%s-%d", a.target, i),
addr,
opts...,
)

cancel()
if err != nil {
// Cleanup if the initialization fails.
Expand All @@ -214,7 +305,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
if allowBatch {
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
conn: conn.ClientConn,
forwardedClients: make(map[string]*batchCommandsStream),
batched: sync.Map{},
epoch: 0,
Expand All @@ -237,7 +328,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint

func (a *connArray) Get() *grpc.ClientConn {
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v))
return a.v[next]
return a.v[next].ClientConn
}

func (a *connArray) Close() {
Expand All @@ -249,6 +340,9 @@ func (a *connArray) Close() {
if c != nil {
err := c.Close()
tikverr.Log(err)
if err == nil {
a.monitor.RemoveConn(c)
}
}
}

Expand Down Expand Up @@ -301,6 +395,8 @@ type RPCClient struct {
// Periodically check whether there is any connection that is idle and then close and remove these connections.
// Implement background cleanup.
isClosed bool

connMonitor *connMonitor
}

// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
Expand All @@ -310,10 +406,12 @@ func NewRPCClient(opts ...Opt) *RPCClient {
option: &option{
dialTimeout: dialTimeout,
},
connMonitor: &connMonitor{},
}
for _, opt := range opts {
opt(cli.option)
}
cli.connMonitor.Start()
return cli
}

Expand Down Expand Up @@ -352,14 +450,14 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func(
for _, opt := range opts {
opt(&client)
}

array, err = newConnArray(
client.GrpcConnectionCount,
addr,
c.option.security,
&c.idleNotify,
enableBatch,
c.option.dialTimeout,
c.connMonitor,
c.option.gRPCDialOptions)

if err != nil {
Expand Down Expand Up @@ -663,6 +761,7 @@ func (c *RPCClient) getMPPStreamResponse(ctx context.Context, client tikvpb.Tikv
func (c *RPCClient) Close() error {
// TODO: add a unit test for SendRequest After Closed
c.closeConns()
c.connMonitor.Stop()
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ func (c *batchCommandsClient) waitConnReady() (err error) {
if c.conn.GetState() == connectivity.Ready {
return
}
if c.conn.GetState() == connectivity.Idle {
c.conn.Connect()
}
start := time.Now()
defer func() {
metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds())
Expand Down
5 changes: 2 additions & 3 deletions internal/client/client_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestPanicInRecvLoop(t *testing.T) {

addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
rpcClient := NewRPCClient()
defer rpcClient.Close()
rpcClient.option.dialTimeout = time.Second / 3

// Start batchRecvLoop, and it should panic in `failPendingRequests`.
Expand All @@ -77,8 +78,6 @@ func TestPanicInRecvLoop(t *testing.T) {
req = tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second*4)
assert.Nil(t, err)

rpcClient.closeConns()
}

func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
Expand All @@ -94,7 +93,7 @@ func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
defer rpcClient.Close()

// Create 4 BatchCommands streams.
prewriteReq := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{})
Expand Down
8 changes: 5 additions & 3 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestConn(t *testing.T) {
})()

client := NewRPCClient()
defer client.Close()

addr := "127.0.0.1:6379"
conn1, err := client.getConnArray(addr, true)
Expand All @@ -88,6 +89,7 @@ func TestConn(t *testing.T) {

func TestGetConnAfterClose(t *testing.T) {
client := NewRPCClient()
defer client.Close()

addr := "127.0.0.1:6379"
connArray, err := client.getConnArray(addr, true)
Expand Down Expand Up @@ -116,6 +118,7 @@ func TestSendWhenReconnect(t *testing.T) {
require.True(t, port > 0)

rpcClient := NewRPCClient()
defer rpcClient.Close()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
conn, err := rpcClient.getConnArray(addr, true)
assert.Nil(t, err)
Expand All @@ -128,7 +131,6 @@ func TestSendWhenReconnect(t *testing.T) {
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second)
assert.True(t, err.Error() == "no available connections")
conn.Close()
server.Stop()
}

Expand Down Expand Up @@ -247,7 +249,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
defer rpcClient.Close()

var checkCnt uint64
// Check no corresponding metadata if ForwardedHost is empty.
Expand Down Expand Up @@ -316,7 +318,7 @@ func TestForwardMetadataByBatchCommands(t *testing.T) {
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
defer rpcClient.Close()

var checkCnt uint64
setCheckHandler := func(forwardedHost string) {
Expand Down
10 changes: 10 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVGrpcConnectionState *prometheus.GaugeVec
)

// Label constants.
Expand Down Expand Up @@ -589,6 +590,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of assertions used in prewrite requests",
}, []string{LblType})

TiKVGrpcConnectionState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "grpc_connection_state",
Help: "State of gRPC connection",
}, []string{"connection_id", "store_ip", "grpc_state"})

initShortcuts()
}

Expand Down Expand Up @@ -659,6 +668,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVReadThroughput)
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
prometheus.MustRegister(TiKVGrpcConnectionState)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down

0 comments on commit 7423502

Please sign in to comment.