Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: reduce the lock contend between sending message and re-creating streaming client (#11301) #11372

Merged
merged 1 commit into from
Jul 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,15 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
return errors.Trace(err)
}
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
tikvTransportLayerLoad: &a.tikvTransportLayerLoad,
closed: 0,
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
}
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
go batchClient.batchRecvLoop(cfg.TiKVClient)
go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout)
Expand Down
164 changes: 104 additions & 60 deletions store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,31 +152,57 @@ func fetchMorePendingRequests(
}
}

type tryLock struct {
sync.RWMutex
reCreating bool
}

func (l *tryLock) tryLockForSend() bool {
l.RLock()
if l.reCreating {
l.RUnlock()
return false
}
return true
}

func (l *tryLock) unlockForSend() {
l.RUnlock()
}

func (l *tryLock) lockForRecreate() {
l.Lock()
l.reCreating = true
l.Unlock()

}

func (l *tryLock) unlockForRecreate() {
l.Lock()
l.reCreating = false
l.Unlock()
}

type batchCommandsClient struct {
// The target host.
target string

conn *grpc.ClientConn
client tikvpb.Tikv_BatchCommandsClient
batched sync.Map
idAlloc uint64
tikvTransportLayerLoad *uint64
conn *grpc.ClientConn
client tikvpb.Tikv_BatchCommandsClient
batched sync.Map
idAlloc uint64

// closed indicates the batch client is closed explicitly or not.
closed int32
// clientLock protects client when re-create the streaming.
clientLock sync.Mutex
// tryLock protects client when re-create the streaming.
tryLock
}

func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) {
// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
c.clientLock.Lock()
defer c.clientLock.Unlock()
for i, requestID := range request.RequestIds {
c.batched.Store(requestID, entries[i])
}
Expand Down Expand Up @@ -211,10 +237,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
})
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
defer c.clientLock.Unlock()
func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
Expand All @@ -226,6 +249,7 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
zap.String("target", c.target),
)
c.client = streamClient

return nil
}
logutil.Logger(context.Background()).Error(
Expand All @@ -236,15 +260,15 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
return err
}

func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransportLayerLoad *uint64) {
defer func() {
if r := recover(); r != nil {
metrics.PanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc()
logutil.Logger(context.Background()).Error("batchRecvLoop",
zap.Reflect("r", r),
zap.Stack("stack"))
logutil.Logger(context.Background()).Info("restart batchRecvLoop")
go c.batchRecvLoop(cfg)
go c.batchRecvLoop(cfg, tikvTransportLayerLoad)
}
}()

Expand All @@ -257,21 +281,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
zap.Error(err),
)

b := NewBackoffer(context.Background(), math.MaxInt32)
now := time.Now()
for { // try to re-create the streaming in the loop.
if c.isStopped() {
return
}

err1 := c.reCreateStreamingClient(err)
if err1 == nil {
break
}
err2 := b.Backoff(boTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
if stopped := c.reCreateStreamingClient(err); stopped {
return
}
metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds())
continue
Expand All @@ -294,14 +306,37 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
c.batched.Delete(requestID)
}

tikvTransportLayerLoad := resp.GetTransportLayerLoad()
if tikvTransportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 {
transportLayerLoad := resp.GetTransportLayerLoad()
if transportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 {
// We need to consider TiKV load only if batch-wait strategy is enabled.
atomic.StoreUint64(c.tikvTransportLayerLoad, tikvTransportLayerLoad)
atomic.StoreUint64(tikvTransportLayerLoad, transportLayerLoad)
}
}
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) (stopped bool) {
// Forbids the batchSendLoop using the old client.
c.lockForRecreate()
defer c.unlockForRecreate()

b := NewBackoffer(context.Background(), math.MaxInt32)
for { // try to re-create the streaming in the loop.
if c.isStopped() {
return true
}
err1 := c.reCreateStreamingClientOnce(err)
if err1 == nil {
break
}

err2 := b.Backoff(boTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
}
return false
}

type batchCommandsEntry struct {
req *tikvpb.BatchCommandsRequest_Request
res chan *tikvpb.BatchCommandsResponse_Response
Expand Down Expand Up @@ -335,10 +370,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {

var bestBatchWaitSize = cfg.BatchWaitSize
for {
// Choose a connection by round-robbin.
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.batchCommandsClients))
batchCommandsClient := a.batchCommandsClients[next]

entries = entries[:0]
requests = requests[:0]
requestIDs = requestIDs[:0]
Expand All @@ -347,9 +378,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad)
// If the target TiKV is overload, wait a while to collect more requests.
if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold {
if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) {
fetchMorePendingRequests(
a.batchCommandsCh, int(cfg.MaxBatchSize), int(bestBatchWaitSize),
cfg.MaxBatchWaitTime, &entries, &requests,
Expand All @@ -367,23 +397,40 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
bestBatchWaitSize += 1
}

length = removeCanceledRequests(&entries, &requests)
if length == 0 {
entries, requests = removeCanceledRequests(entries, requests)
if len(entries) == 0 {
continue // All requests are canceled.
}
maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length))
for i := 0; i < length; i++ {
requestID := uint64(i) + maxBatchID - uint64(length)
requestIDs = append(requestIDs, requestID)
}

req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
a.getClientAndSend(entries, requests, requestIDs)
}
}

func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) {
// Choose a connection by round-robbin.
var cli *batchCommandsClient
for {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
cli = a.batchCommandsClients[a.index]
// The lock protects the batchCommandsClient from been closed while it's inuse.
if cli.tryLockForSend() {
break
}
}
defer cli.unlockForSend()

batchCommandsClient.send(req, entries)
maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests)))
for i := 0; i < len(requests); i++ {
requestID := uint64(i) + maxBatchID - uint64(len(requests))
requestIDs = append(requestIDs, requestID)
}
req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
}

cli.send(req, entries)
return
}

func (a *batchConn) Close() {
Expand All @@ -399,20 +446,17 @@ func (a *batchConn) Close() {
}

// removeCanceledRequests removes canceled requests before sending.
func removeCanceledRequests(
entries *[]*batchCommandsEntry,
requests *[]*tikvpb.BatchCommandsRequest_Request) int {
validEntries := (*entries)[:0]
validRequets := (*requests)[:0]
for _, e := range *entries {
func removeCanceledRequests(entries []*batchCommandsEntry,
requests []*tikvpb.BatchCommandsRequest_Request) ([]*batchCommandsEntry, []*tikvpb.BatchCommandsRequest_Request) {
validEntries := entries[:0]
validRequets := requests[:0]
for _, e := range entries {
if !e.isCanceled() {
validEntries = append(validEntries, e)
validRequets = append(validRequets, e.req)
}
}
*entries = validEntries
*requests = validRequets
return len(*entries)
return validEntries, validRequets
}

func sendBatchRequest(
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) {
for i := range entries {
requests[i] = entries[i].req
}
length := removeCanceledRequests(&entries, &requests)
c.Assert(length, Equals, 2)
entries, requests = removeCanceledRequests(entries, requests)
c.Assert(len(entries), Equals, 2)
for _, e := range entries {
c.Assert(e.isCanceled(), IsFalse)
}
Expand Down