Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the issue that health check may set liveness wrongly #1127

Merged
merged 10 commits into from
Jan 24, 2024
3 changes: 2 additions & 1 deletion config/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func TestBackoffErrorType(t *testing.T) {
err = b.Backoff(BoTxnNotFound, errors.New("txn not found"))
if err != nil {
// Next backoff should return error of backoff that sleeps for longest time.
assert.ErrorIs(t, err, BoTxnNotFound.err)
cfg, _ := b.longestSleepCfg()
assert.ErrorIs(t, err, cfg.err)
return
}
}
Expand Down
60 changes: 48 additions & 12 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
newStore.unreachableSince = s.unreachableSince
Comment on lines 2620 to +2622
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about abstracting a newStore function to force specify the livenessState and unreachableSince as the input parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to handle it later (as a part of #1104). There are 4 &Store{...} in region_cache.go now, I'd like to handle them togather according to the store lifecycle. Let this PR fix the corresponding issue by now.

if s.addr == addr {
newStore.slowScore = s.slowScore
}
Expand Down Expand Up @@ -2783,50 +2785,84 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
// It may be already started by another thread.
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
s.unreachableSince = time.Now()
go s.checkUntilHealth(c)
reResolveInterval := 30 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to make it a constant value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rawkv tests use failpoint to set it shorter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

var DefReResolveInterval = 30 * time.Second // global scope.

func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState){
  reResolveInterval = DefReResolveInterval
  ...
}

and if test needs a shorter reResolveInterval, change DefReResolveInterval directly, no need to use the following failpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

var DefReResolveInterval = 30 * time.Second // global scope.

func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState){
  reResolveInterval = DefReResolveInterval
  ...
}

and if test needs a shorter reResolveInterval, change DefReResolveInterval directly, no need to use the following failpoint.

Personally I do not prefer that. The global var is required to be handled carefully to avoid data race in ut. I've struggled with SetRegionCacheTTLSec in #1122 for about an hour.

if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
if dur, err := time.ParseDuration(val.(string)); err == nil {
reResolveInterval = dur
}
}
go s.checkUntilHealth(c, liveness, reResolveInterval)
}
return
}

func (s *Store) checkUntilHealth(c *RegionCache) {
defer atomic.StoreUint32(&s.livenessState, uint32(reachable))

func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer func() {
ticker.Stop()
if liveness != reachable {
logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop",
zap.Uint64("storeID", s.storeID),
zap.String("state", s.getResolveState().String()),
zap.String("liveness", s.getLivenessState().String()))
}
}()
lastCheckPDTime := time.Now()

for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
if time.Since(lastCheckPDTime) > time.Second*30 {
if time.Since(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = time.Now()

valid, err := s.reResolve(c)
if err != nil {
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
} else if !valid {
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr))
if s.getResolveState() == deleted {
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
newStore, _ := c.getStore(s.storeID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add an assertion newStore != nil here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's guaranteed by reResolve, changeToActiveStore also rely on it.

logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
zap.String("newAddr", newStore.addr),
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
}
return
}
}

l := s.requestLiveness(c.ctx, c)
if l == reachable {
liveness = s.requestLiveness(c.ctx, c)
atomic.StoreUint32(&s.livenessState, uint32(liveness))
if liveness == reachable {
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))

return
}
atomic.StoreUint32(&s.livenessState, uint32(l))
}
}
}

func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) {
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead.
if val, err := util.EvalFailpoint("injectLiveness"); err == nil {
switch val.(string) {
liveness := val.(string)
if strings.Contains(liveness, " ") {
for _, item := range strings.Split(liveness, " ") {
kv := strings.Split(item, ":")
if len(kv) != 2 {
continue
}
if kv[0] == s.addr {
liveness = kv[1]
break
}
}
}
switch liveness {
case "unreachable":
return unreachable
case "reachable":
Expand Down
42 changes: 42 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,48 @@ func (s *testRegionCacheSuite) TestSlowScoreStat() {
s.True(slowScore.isSlow())
}

func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
// init region cache
s.cache.LocateKey(s.bo, []byte("a"))

store1, _ := s.cache.getStore(s.store1)
s.Require().NotNil(store1)
s.Require().Equal(resolved, store1.getResolveState())

// setup mock liveness func
store1Liveness := uint32(unreachable)
s.cache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
if s.storeID == store1.storeID {
return livenessState(atomic.LoadUint32(&store1Liveness))
}
return reachable
})

// start health check loop
atomic.StoreUint32(&store1.livenessState, store1Liveness)
go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second)

// update store meta
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)

// assert that the old store should be deleted and it's not reachable
s.Eventually(func() bool {
return store1.getResolveState() == deleted && store1.getLivenessState() != reachable
}, 3*time.Second, time.Second)

// assert that the new store should be added and it's also not reachable
newStore1, _ := s.cache.getStore(store1.storeID)
s.Require().NotEqual(reachable, newStore1.getLivenessState())

// recover store1
atomic.StoreUint32(&store1Liveness, uint32(reachable))

// assert that the new store should be reachable
s.Eventually(func() bool {
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
}, 3*time.Second, time.Second)
}

func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
_ = s.cache.refreshRegionIndex(s.bo)
r, _ := s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
Expand Down
2 changes: 1 addition & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
regionStore := region.getStore()
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
s.NotEqual(leaderAddr, "")
for i := 0; i < 10; i++ {
for i := 0; i < 30; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
s.Nil(err)
Expand Down
32 changes: 32 additions & 0 deletions rawkv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ import (
"hash/crc64"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
)

func TestRawKV(t *testing.T) {
tikv.EnableFailpoints()
suite.Run(t, new(testRawkvSuite))
}

Expand Down Expand Up @@ -77,9 +80,11 @@ func (s *testRawkvSuite) SetupTest() {
s.peer1 = peerIDs[0]
s.peer2 = peerIDs[1]
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`))
}

func (s *testRawkvSuite) TearDownTest() {
s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval"))
s.mvccStore.Close()
}

Expand Down Expand Up @@ -110,6 +115,9 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

getVal, err := client.Get(context.Background(), testKey)

s.Nil(err)
Expand Down Expand Up @@ -172,6 +180,9 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

getVal, err := client.Get(context.Background(), testKey)
s.Nil(err)
s.Equal(getVal, testValue)
Expand Down Expand Up @@ -200,6 +211,9 @@ func (s *testRawkvSuite) TestReplaceStore() {
s.cluster.RemovePeer(s.region1, s.peer1)
s.cluster.ChangeLeader(s.region1, peer3)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

err = client.Put(context.Background(), testKey, testValue)
s.Nil(err)
}
Expand Down Expand Up @@ -234,6 +248,9 @@ func (s *testRawkvSuite) TestColumnFamilyForClient() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test get
client.SetColumnFamily(cf1)
getVal, err := client.Get(context.Background(), testKeyCf1)
Expand Down Expand Up @@ -303,6 +320,9 @@ func (s *testRawkvSuite) TestColumnFamilyForOptions() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test get
getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1))
s.Nil(err)
Expand Down Expand Up @@ -370,6 +390,9 @@ func (s *testRawkvSuite) TestBatch() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test BatchGet
returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf))
s.Nil(err)
Expand Down Expand Up @@ -426,6 +449,9 @@ func (s *testRawkvSuite) TestScan() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test scan
startKey, endKey := []byte("key1"), []byte("keyz")
limit := 3
Expand Down Expand Up @@ -498,6 +524,9 @@ func (s *testRawkvSuite) TestDeleteRange() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test DeleteRange
startKey, endKey := []byte("key3"), []byte(nil)
err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf))
Expand Down Expand Up @@ -535,6 +564,9 @@ func (s *testRawkvSuite) TestCompareAndSwap() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test CompareAndSwap for false atomic
_, _, err = client.CompareAndSwap(
context.Background(),
Expand Down
Loading