Skip to content

Commit

Permalink
fix the issue that health check may set liveness wrongly (#1127) (#1136)
Browse files Browse the repository at this point in the history
* fix the issue that health check may set liveness wrongly



* fix lint issue



* fix rawkv ut



* fix data race



* use getStore instead of accessing storeMu directly



* make TestAccessFollowerAfter1TiKVDown stable



* make TestBackoffErrorType stable



* address comments



---------

Signed-off-by: zyguan <[email protected]>
Co-authored-by: disksing <[email protected]>
  • Loading branch information
zyguan and disksing committed Jan 25, 2024
1 parent 4ecf7c2 commit 4c64e5d
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 13 deletions.
61 changes: 50 additions & 11 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
newStore.unreachableSince = s.unreachableSince
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
Expand Down Expand Up @@ -2600,49 +2602,86 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt
// It may be already started by another thread.
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
s.unreachableSince = time.Now()
go s.checkUntilHealth(c)
reResolveInterval := 30 * time.Second
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
if dur, err := time.ParseDuration(val.(string)); err == nil {
reResolveInterval = dur
}
}
go s.checkUntilHealth(c, liveness, reResolveInterval)
}
}

func (s *Store) checkUntilHealth(c *RegionCache) {
defer atomic.StoreUint32(&s.livenessState, uint32(reachable))

func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) {
ticker := time.NewTicker(time.Second)
defer func() {
ticker.Stop()
if liveness != reachable {
logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop",
zap.Uint64("storeID", s.storeID),
zap.String("state", s.getResolveState().String()),
zap.String("liveness", s.getLivenessState().String()))
}
}()
lastCheckPDTime := time.Now()

for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
if time.Since(lastCheckPDTime) > time.Second*30 {
if time.Since(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = time.Now()

valid, err := s.reResolve(c)
if err != nil {
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
} else if !valid {
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr))
if s.getResolveState() == deleted {
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
c.storeMu.RLock()
newStore := c.storeMu.stores[s.storeID]
c.storeMu.RUnlock()
logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
zap.String("newAddr", newStore.addr),
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
}
return
}
}

bo := retry.NewNoopBackoff(c.ctx)
l := s.requestLiveness(bo, c)
if l == reachable {
liveness = s.requestLiveness(bo, c)
atomic.StoreUint32(&s.livenessState, uint32(liveness))
if liveness == reachable {
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))

return
}
atomic.StoreUint32(&s.livenessState, uint32(l))
}
}
}

func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) {
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead.
if val, err := util.EvalFailpoint("injectLiveness"); err == nil {
switch val.(string) {
liveness := val.(string)
if strings.Contains(liveness, " ") {
for _, item := range strings.Split(liveness, " ") {
kv := strings.Split(item, ":")
if len(kv) != 2 {
continue
}
if kv[0] == s.addr {
liveness = kv[1]
break
}
}
}
switch liveness {
case "unreachable":
return unreachable
case "reachable":
Expand Down
43 changes: 43 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,3 +1665,46 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() {
}, 3*time.Second, 200*time.Millisecond)
s.checkCache(remaining)
}

func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
// init region cache
s.cache.LocateKey(s.bo, []byte("a"))

store1 := s.cache.getStoreByStoreID(s.store1)
s.Require().NotNil(store1)
s.Require().Equal(resolved, store1.getResolveState())

// setup mock liveness func
store1Liveness := uint32(unreachable)
tf := func(s *Store, bo *retry.Backoffer) livenessState {
if s.storeID == store1.storeID {
return livenessState(atomic.LoadUint32(&store1Liveness))
}
return reachable
}
s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))

// start health check loop
atomic.StoreUint32(&store1.livenessState, store1Liveness)
go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second)

// update store meta
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)

// assert that the old store should be deleted and it's not reachable
s.Eventually(func() bool {
return store1.getResolveState() == deleted && store1.getLivenessState() != reachable
}, 3*time.Second, time.Second)

// assert that the new store should be added and it's also not reachable
newStore1 := s.cache.getStoreByStoreID(s.store1)
s.Require().NotEqual(reachable, newStore1.getLivenessState())

// recover store1
atomic.StoreUint32(&store1Liveness, uint32(reachable))

// assert that the new store should be reachable
s.Eventually(func() bool {
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
}, 3*time.Second, time.Second)
}
2 changes: 1 addition & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
regionStore := region.getStore()
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
s.NotEqual(leaderAddr, "")
for i := 0; i < 10; i++ {
for i := 0; i < 30; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
s.Nil(err)
Expand Down
3 changes: 2 additions & 1 deletion internal/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestBackoffErrorType(t *testing.T) {
err = b.Backoff(BoTxnNotFound, errors.New("txn not found"))
if err != nil {
// Next backoff should return error of backoff that sleeps for longest time.
assert.ErrorIs(t, err, BoTxnNotFound.err)
cfg, _ := b.longestSleepCfg()
assert.ErrorIs(t, err, cfg.err)
return
}
}
Expand Down
32 changes: 32 additions & 0 deletions rawkv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ import (
"hash/crc64"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
)

func TestRawKV(t *testing.T) {
tikv.EnableFailpoints()
suite.Run(t, new(testRawkvSuite))
}

Expand Down Expand Up @@ -77,9 +80,11 @@ func (s *testRawkvSuite) SetupTest() {
s.peer1 = peerIDs[0]
s.peer2 = peerIDs[1]
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`))
}

func (s *testRawkvSuite) TearDownTest() {
s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval"))
s.mvccStore.Close()
}

Expand Down Expand Up @@ -110,6 +115,9 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

getVal, err := client.Get(context.Background(), testKey)

s.Nil(err)
Expand Down Expand Up @@ -172,6 +180,9 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

getVal, err := client.Get(context.Background(), testKey)
s.Nil(err)
s.Equal(getVal, testValue)
Expand Down Expand Up @@ -200,6 +211,9 @@ func (s *testRawkvSuite) TestReplaceStore() {
s.cluster.RemovePeer(s.region1, s.peer1)
s.cluster.ChangeLeader(s.region1, peer3)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

err = client.Put(context.Background(), testKey, testValue)
s.Nil(err)
}
Expand Down Expand Up @@ -234,6 +248,9 @@ func (s *testRawkvSuite) TestColumnFamilyForClient() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test get
client.SetColumnFamily(cf1)
getVal, err := client.Get(context.Background(), testKeyCf1)
Expand Down Expand Up @@ -303,6 +320,9 @@ func (s *testRawkvSuite) TestColumnFamilyForOptions() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test get
getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1))
s.Nil(err)
Expand Down Expand Up @@ -370,6 +390,9 @@ func (s *testRawkvSuite) TestBatch() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test BatchGet
returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf))
s.Nil(err)
Expand Down Expand Up @@ -426,6 +449,9 @@ func (s *testRawkvSuite) TestScan() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test scan
startKey, endKey := []byte("key1"), []byte("keyz")
limit := 3
Expand Down Expand Up @@ -498,6 +524,9 @@ func (s *testRawkvSuite) TestDeleteRange() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test DeleteRange
startKey, endKey := []byte("key3"), []byte(nil)
err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf))
Expand Down Expand Up @@ -535,6 +564,9 @@ func (s *testRawkvSuite) TestCompareAndSwap() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test CompareAndSwap for false atomic
_, _, err = client.CompareAndSwap(
context.Background(),
Expand Down

0 comments on commit 4c64e5d

Please sign in to comment.