Skip to content

Commit

Permalink
gcworker: fix potential gcworker goroutine leak during tikv down (#13921
Browse files Browse the repository at this point in the history
) (#14106)
  • Loading branch information
lysu authored and ngaut committed Dec 19, 2019
1 parent 1c224aa commit fda6d3b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
16 changes: 13 additions & 3 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -818,15 +819,20 @@ func (w *GCWorker) resolveLocksForRange(

regions := 0
key := startKey
bo := tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
// cooperate with github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry
ctx = context.WithValue(ctx, "injectedBackoff", struct{}{})
bo = tikv.NewBackoffer(ctx, sleep)
})
for {
select {
case <-ctx.Done():
return regions, errors.New("[gc worker] gc job canceled")
default:
}

bo := tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff)

req.ScanLock.StartKey = key
loc, err := w.store.GetRegionCache().LocateKey(bo, key)
if err != nil {
Expand Down Expand Up @@ -871,7 +877,6 @@ func (w *GCWorker) resolveLocksForRange(
}
continue
}

if len(locks) < gcScanLockLimit {
regions++
key = loc.EndKey
Expand All @@ -887,6 +892,11 @@ func (w *GCWorker) resolveLocksForRange(
if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
bo = tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
bo = tikv.NewBackoffer(ctx, sleep)
})
}
return regions, nil
}
Expand Down
11 changes: 11 additions & 0 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,17 @@ func (s *testGCWorkerSuite) TestCheckGCMode(c *C) {
c.Assert(useDistributedGC, Equals, true)
}

func (s *testGCWorkerSuite) TestResolveLockRangeInfine(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry", "return(true)"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/setGcResolveMaxBackoff", "return(1)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/setGcResolveMaxBackoff"), IsNil)
}()
_, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte{0}, []byte{1})
c.Assert(err, NotNil)
}

func (s *testGCWorkerSuite) TestRunGCJob(c *C) {
gcSafePointCacheInterval = 0
err := RunGCJob(context.Background(), s.store, 0, "mock", 1)
Expand Down
8 changes: 8 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, re
if err != nil {
return nil, nil, errors.Trace(err)
}
var resp *tikvrpc.Response
failpoint.Inject("invalidCacheAndRetry", func() {
// cooperate with github.com/pingcap/tidb/store/tikv/gcworker/setGcResolveMaxBackoff
if c := bo.ctx.Value("injectedBackoff"); c != nil {
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
failpoint.Return(resp, nil, err)
}
})
if ctx == nil {
// If the region is not found in cache, it must be out
// of date and already be cleaned up. We can skip the
Expand Down

0 comments on commit fda6d3b

Please sign in to comment.