diff --git a/store/tikv/client.go b/store/tikv/client.go index 2ec83f9e6d61d..6a483ba3ae8f8 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -152,16 +152,15 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint return errors.Trace(err) } batchClient := &batchCommandsClient{ - target: a.target, - conn: conn, - client: streamClient, - batched: sync.Map{}, - idAlloc: 0, - tikvTransportLayerLoad: &a.tikvTransportLayerLoad, - closed: 0, + target: a.target, + conn: conn, + client: streamClient, + batched: sync.Map{}, + idAlloc: 0, + closed: 0, } a.batchCommandsClients = append(a.batchCommandsClients, batchClient) - go batchClient.batchRecvLoop(cfg.TiKVClient) + go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad) } } go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index f0fda9d6d517b..937f8221e2bef 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -152,20 +152,50 @@ func fetchMorePendingRequests( } } +type tryLock struct { + sync.RWMutex + reCreating bool +} + +func (l *tryLock) tryLockForSend() bool { + l.RLock() + if l.reCreating { + l.RUnlock() + return false + } + return true +} + +func (l *tryLock) unlockForSend() { + l.RUnlock() +} + +func (l *tryLock) lockForRecreate() { + l.Lock() + l.reCreating = true + l.Unlock() + +} + +func (l *tryLock) unlockForRecreate() { + l.Lock() + l.reCreating = false + l.Unlock() +} + type batchCommandsClient struct { // The target host. target string - conn *grpc.ClientConn - client tikvpb.Tikv_BatchCommandsClient - batched sync.Map - idAlloc uint64 - tikvTransportLayerLoad *uint64 + conn *grpc.ClientConn + client tikvpb.Tikv_BatchCommandsClient + batched sync.Map + idAlloc uint64 // closed indicates the batch client is closed explicitly or not. closed int32 - // clientLock protects client when re-create the streaming. - clientLock sync.Mutex + // tryLock protects client when re-create the streaming. + tryLock } func (c *batchCommandsClient) isStopped() bool { @@ -173,10 +203,6 @@ func (c *batchCommandsClient) isStopped() bool { } func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) { - // Use the lock to protect the stream client won't be replaced by RecvLoop, - // and new added request won't be removed by `failPendingRequests`. - c.clientLock.Lock() - defer c.clientLock.Unlock() for i, requestID := range request.RequestIds { c.batched.Store(requestID, entries[i]) } @@ -211,10 +237,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClient(err error) error { - // Hold the lock to forbid batchSendLoop using the old client. - c.clientLock.Lock() - defer c.clientLock.Unlock() +func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error { c.failPendingRequests(err) // fail all pending requests. // Re-establish a application layer stream. TCP layer is handled by gRPC. @@ -226,6 +249,7 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error { zap.String("target", c.target), ) c.client = streamClient + return nil } logutil.Logger(context.Background()).Error( @@ -236,7 +260,7 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error { return err } -func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { +func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransportLayerLoad *uint64) { defer func() { if r := recover(); r != nil { metrics.PanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() @@ -244,7 +268,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { zap.Reflect("r", r), zap.Stack("stack")) logutil.Logger(context.Background()).Info("restart batchRecvLoop") - go c.batchRecvLoop(cfg) + go c.batchRecvLoop(cfg, tikvTransportLayerLoad) } }() @@ -257,21 +281,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { zap.Error(err), ) - b := NewBackoffer(context.Background(), math.MaxInt32) now := time.Now() - for { // try to re-create the streaming in the loop. - if c.isStopped() { - return - } - - err1 := c.reCreateStreamingClient(err) - if err1 == nil { - break - } - err2 := b.Backoff(boTiKVRPC, err1) - // As timeout is set to math.MaxUint32, err2 should always be nil. - // This line is added to make the 'make errcheck' pass. - terror.Log(err2) + if stopped := c.reCreateStreamingClient(err); stopped { + return } metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) continue @@ -294,14 +306,37 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { c.batched.Delete(requestID) } - tikvTransportLayerLoad := resp.GetTransportLayerLoad() - if tikvTransportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 { + transportLayerLoad := resp.GetTransportLayerLoad() + if transportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 { // We need to consider TiKV load only if batch-wait strategy is enabled. - atomic.StoreUint64(c.tikvTransportLayerLoad, tikvTransportLayerLoad) + atomic.StoreUint64(tikvTransportLayerLoad, transportLayerLoad) } } } +func (c *batchCommandsClient) reCreateStreamingClient(err error) (stopped bool) { + // Forbids the batchSendLoop using the old client. + c.lockForRecreate() + defer c.unlockForRecreate() + + b := NewBackoffer(context.Background(), math.MaxInt32) + for { // try to re-create the streaming in the loop. + if c.isStopped() { + return true + } + err1 := c.reCreateStreamingClientOnce(err) + if err1 == nil { + break + } + + err2 := b.Backoff(boTiKVRPC, err1) + // As timeout is set to math.MaxUint32, err2 should always be nil. + // This line is added to make the 'make errcheck' pass. + terror.Log(err2) + } + return false +} + type batchCommandsEntry struct { req *tikvpb.BatchCommandsRequest_Request res chan *tikvpb.BatchCommandsResponse_Response @@ -335,10 +370,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { var bestBatchWaitSize = cfg.BatchWaitSize for { - // Choose a connection by round-robbin. - next := atomic.AddUint32(&a.index, 1) % uint32(len(a.batchCommandsClients)) - batchCommandsClient := a.batchCommandsClients[next] - entries = entries[:0] requests = requests[:0] requestIDs = requestIDs[:0] @@ -347,9 +378,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { - tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad) // If the target TiKV is overload, wait a while to collect more requests. - if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold { + if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) { fetchMorePendingRequests( a.batchCommandsCh, int(cfg.MaxBatchSize), int(bestBatchWaitSize), cfg.MaxBatchWaitTime, &entries, &requests, @@ -367,23 +397,40 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { bestBatchWaitSize += 1 } - length = removeCanceledRequests(&entries, &requests) - if length == 0 { + entries, requests = removeCanceledRequests(entries, requests) + if len(entries) == 0 { continue // All requests are canceled. } - maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) - for i := 0; i < length; i++ { - requestID := uint64(i) + maxBatchID - uint64(length) - requestIDs = append(requestIDs, requestID) - } - req := &tikvpb.BatchCommandsRequest{ - Requests: requests, - RequestIds: requestIDs, + a.getClientAndSend(entries, requests, requestIDs) + } +} + +func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) { + // Choose a connection by round-robbin. + var cli *batchCommandsClient + for { + a.index = (a.index + 1) % uint32(len(a.batchCommandsClients)) + cli = a.batchCommandsClients[a.index] + // The lock protects the batchCommandsClient from been closed while it's inuse. + if cli.tryLockForSend() { + break } + } + defer cli.unlockForSend() - batchCommandsClient.send(req, entries) + maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests))) + for i := 0; i < len(requests); i++ { + requestID := uint64(i) + maxBatchID - uint64(len(requests)) + requestIDs = append(requestIDs, requestID) } + req := &tikvpb.BatchCommandsRequest{ + Requests: requests, + RequestIds: requestIDs, + } + + cli.send(req, entries) + return } func (a *batchConn) Close() { @@ -399,20 +446,17 @@ func (a *batchConn) Close() { } // removeCanceledRequests removes canceled requests before sending. -func removeCanceledRequests( - entries *[]*batchCommandsEntry, - requests *[]*tikvpb.BatchCommandsRequest_Request) int { - validEntries := (*entries)[:0] - validRequets := (*requests)[:0] - for _, e := range *entries { +func removeCanceledRequests(entries []*batchCommandsEntry, + requests []*tikvpb.BatchCommandsRequest_Request) ([]*batchCommandsEntry, []*tikvpb.BatchCommandsRequest_Request) { + validEntries := entries[:0] + validRequets := requests[:0] + for _, e := range entries { if !e.isCanceled() { validEntries = append(validEntries, e) validRequets = append(validRequets, e.req) } } - *entries = validEntries - *requests = validRequets - return len(*entries) + return validEntries, validRequets } func sendBatchRequest( diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 081607f26db31..ab6a84d87b9ba 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -74,8 +74,8 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { for i := range entries { requests[i] = entries[i].req } - length := removeCanceledRequests(&entries, &requests) - c.Assert(length, Equals, 2) + entries, requests = removeCanceledRequests(entries, requests) + c.Assert(len(entries), Equals, 2) for _, e := range entries { c.Assert(e.isCanceled(), IsFalse) }