Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the issue that health check may set liveness wrongly (#1127) #1136

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
newStore.unreachableSince = s.unreachableSince
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
Expand Down Expand Up @@ -2600,49 +2602,86 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt
// It may be already started by another thread.
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
s.unreachableSince = time.Now()
go s.checkUntilHealth(c)
reResolveInterval := 30 * time.Second
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
if dur, err := time.ParseDuration(val.(string)); err == nil {
reResolveInterval = dur
}
}
go s.checkUntilHealth(c, liveness, reResolveInterval)
}
}

func (s *Store) checkUntilHealth(c *RegionCache) {
defer atomic.StoreUint32(&s.livenessState, uint32(reachable))

func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) {
ticker := time.NewTicker(time.Second)
defer func() {
ticker.Stop()
if liveness != reachable {
logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop",
zap.Uint64("storeID", s.storeID),
zap.String("state", s.getResolveState().String()),
zap.String("liveness", s.getLivenessState().String()))
}
}()
lastCheckPDTime := time.Now()

for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
if time.Since(lastCheckPDTime) > time.Second*30 {
if time.Since(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = time.Now()

valid, err := s.reResolve(c)
if err != nil {
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
} else if !valid {
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr))
if s.getResolveState() == deleted {
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
c.storeMu.RLock()
newStore := c.storeMu.stores[s.storeID]
c.storeMu.RUnlock()
logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
zap.String("newAddr", newStore.addr),
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
}
return
}
}

bo := retry.NewNoopBackoff(c.ctx)
l := s.requestLiveness(bo, c)
if l == reachable {
liveness = s.requestLiveness(bo, c)
atomic.StoreUint32(&s.livenessState, uint32(liveness))
if liveness == reachable {
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))

return
}
atomic.StoreUint32(&s.livenessState, uint32(l))
}
}
}

func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) {
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead.
if val, err := util.EvalFailpoint("injectLiveness"); err == nil {
switch val.(string) {
liveness := val.(string)
if strings.Contains(liveness, " ") {
for _, item := range strings.Split(liveness, " ") {
kv := strings.Split(item, ":")
if len(kv) != 2 {
continue
}
if kv[0] == s.addr {
liveness = kv[1]
break
}
}
}
switch liveness {
case "unreachable":
return unreachable
case "reachable":
Expand Down
43 changes: 43 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,3 +1665,46 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() {
}, 3*time.Second, 200*time.Millisecond)
s.checkCache(remaining)
}

func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
// init region cache
s.cache.LocateKey(s.bo, []byte("a"))

store1 := s.cache.getStoreByStoreID(s.store1)
s.Require().NotNil(store1)
s.Require().Equal(resolved, store1.getResolveState())

// setup mock liveness func
store1Liveness := uint32(unreachable)
tf := func(s *Store, bo *retry.Backoffer) livenessState {
if s.storeID == store1.storeID {
return livenessState(atomic.LoadUint32(&store1Liveness))
}
return reachable
}
s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))

// start health check loop
atomic.StoreUint32(&store1.livenessState, store1Liveness)
go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second)

// update store meta
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)

// assert that the old store should be deleted and it's not reachable
s.Eventually(func() bool {
return store1.getResolveState() == deleted && store1.getLivenessState() != reachable
}, 3*time.Second, time.Second)

// assert that the new store should be added and it's also not reachable
newStore1 := s.cache.getStoreByStoreID(s.store1)
s.Require().NotEqual(reachable, newStore1.getLivenessState())

// recover store1
atomic.StoreUint32(&store1Liveness, uint32(reachable))

// assert that the new store should be reachable
s.Eventually(func() bool {
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
}, 3*time.Second, time.Second)
}
2 changes: 1 addition & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
regionStore := region.getStore()
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
s.NotEqual(leaderAddr, "")
for i := 0; i < 10; i++ {
for i := 0; i < 30; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
s.Nil(err)
Expand Down
3 changes: 2 additions & 1 deletion internal/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestBackoffErrorType(t *testing.T) {
err = b.Backoff(BoTxnNotFound, errors.New("txn not found"))
if err != nil {
// Next backoff should return error of backoff that sleeps for longest time.
assert.ErrorIs(t, err, BoTxnNotFound.err)
cfg, _ := b.longestSleepCfg()
assert.ErrorIs(t, err, cfg.err)
return
}
}
Expand Down
32 changes: 32 additions & 0 deletions rawkv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ import (
"hash/crc64"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
)

func TestRawKV(t *testing.T) {
tikv.EnableFailpoints()
suite.Run(t, new(testRawkvSuite))
}

Expand Down Expand Up @@ -77,9 +80,11 @@ func (s *testRawkvSuite) SetupTest() {
s.peer1 = peerIDs[0]
s.peer2 = peerIDs[1]
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`))
}

func (s *testRawkvSuite) TearDownTest() {
s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval"))
s.mvccStore.Close()
}

Expand Down Expand Up @@ -110,6 +115,9 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

getVal, err := client.Get(context.Background(), testKey)

s.Nil(err)
Expand Down Expand Up @@ -172,6 +180,9 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

getVal, err := client.Get(context.Background(), testKey)
s.Nil(err)
s.Equal(getVal, testValue)
Expand Down Expand Up @@ -200,6 +211,9 @@ func (s *testRawkvSuite) TestReplaceStore() {
s.cluster.RemovePeer(s.region1, s.peer1)
s.cluster.ChangeLeader(s.region1, peer3)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

err = client.Put(context.Background(), testKey, testValue)
s.Nil(err)
}
Expand Down Expand Up @@ -234,6 +248,9 @@ func (s *testRawkvSuite) TestColumnFamilyForClient() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test get
client.SetColumnFamily(cf1)
getVal, err := client.Get(context.Background(), testKeyCf1)
Expand Down Expand Up @@ -303,6 +320,9 @@ func (s *testRawkvSuite) TestColumnFamilyForOptions() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test get
getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1))
s.Nil(err)
Expand Down Expand Up @@ -370,6 +390,9 @@ func (s *testRawkvSuite) TestBatch() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test BatchGet
returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf))
s.Nil(err)
Expand Down Expand Up @@ -426,6 +449,9 @@ func (s *testRawkvSuite) TestScan() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test scan
startKey, endKey := []byte("key1"), []byte("keyz")
limit := 3
Expand Down Expand Up @@ -498,6 +524,9 @@ func (s *testRawkvSuite) TestDeleteRange() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test DeleteRange
startKey, endKey := []byte("key3"), []byte(nil)
err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf))
Expand Down Expand Up @@ -535,6 +564,9 @@ func (s *testRawkvSuite) TestCompareAndSwap() {
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)

s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")

// test CompareAndSwap for false atomic
_, _, err = client.CompareAndSwap(
context.Background(),
Expand Down
Loading