From 4da22ef638588fe2f5ef18ac49c3c3aa7b9ba1ca Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 24 Jan 2024 15:11:01 +0800 Subject: [PATCH] fix the issue that health check may set liveness wrongly (#1127) * fix the issue that health check may set liveness wrongly Signed-off-by: zyguan * fix lint issue Signed-off-by: zyguan * fix rawkv ut Signed-off-by: zyguan * fix data race Signed-off-by: zyguan * use getStore instead of accessing storeMu directly Signed-off-by: zyguan * make TestAccessFollowerAfter1TiKVDown stable Signed-off-by: zyguan * make TestBackoffErrorType stable Signed-off-by: zyguan * address comments Signed-off-by: zyguan --------- Signed-off-by: zyguan Co-authored-by: disksing Signed-off-by: zyguan --- internal/locate/region_cache.go | 61 ++++++++++++++++++++----- internal/locate/region_cache_test.go | 43 +++++++++++++++++ internal/locate/region_request3_test.go | 2 +- internal/retry/backoff_test.go | 3 +- rawkv/rawkv_test.go | 32 +++++++++++++ 5 files changed, 128 insertions(+), 13 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 110b32aa5..d78400233 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2448,6 +2448,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} + newStore.livenessState = atomic.LoadUint32(&s.livenessState) + newStore.unreachableSince = s.unreachableSince c.storeMu.Lock() c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() @@ -2600,14 +2602,27 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt // It may be already started by another thread. if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { s.unreachableSince = time.Now() - go s.checkUntilHealth(c) + reResolveInterval := 30 * time.Second + if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { + if dur, err := time.ParseDuration(val.(string)); err == nil { + reResolveInterval = dur + } + } + go s.checkUntilHealth(c, liveness, reResolveInterval) } } -func (s *Store) checkUntilHealth(c *RegionCache) { - defer atomic.StoreUint32(&s.livenessState, uint32(reachable)) - +func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) { ticker := time.NewTicker(time.Second) + defer func() { + ticker.Stop() + if liveness != reachable { + logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop", + zap.Uint64("storeID", s.storeID), + zap.String("state", s.getResolveState().String()), + zap.String("liveness", s.getLivenessState().String())) + } + }() lastCheckPDTime := time.Now() for { @@ -2615,26 +2630,37 @@ func (s *Store) checkUntilHealth(c *RegionCache) { case <-c.ctx.Done(): return case <-ticker.C: - if time.Since(lastCheckPDTime) > time.Second*30 { + if time.Since(lastCheckPDTime) > reResolveInterval { lastCheckPDTime = time.Now() valid, err := s.reResolve(c) if err != nil { logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) } else if !valid { - logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr)) + if s.getResolveState() == deleted { + // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). + c.storeMu.RLock() + newStore := c.storeMu.stores[s.storeID] + c.storeMu.RUnlock() + logutil.BgLogger().Info("[health check] store meta changed", + zap.Uint64("storeID", s.storeID), + zap.String("oldAddr", s.addr), + zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), + zap.String("newAddr", newStore.addr), + zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) + go newStore.checkUntilHealth(c, liveness, reResolveInterval) + } return } } bo := retry.NewNoopBackoff(c.ctx) - l := s.requestLiveness(bo, c) - if l == reachable { + liveness = s.requestLiveness(bo, c) + atomic.StoreUint32(&s.livenessState, uint32(liveness)) + if liveness == reachable { logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) - return } - atomic.StoreUint32(&s.livenessState, uint32(l)) } } } @@ -2642,7 +2668,20 @@ func (s *Store) checkUntilHealth(c *RegionCache) { func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) { // It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. if val, err := util.EvalFailpoint("injectLiveness"); err == nil { - switch val.(string) { + liveness := val.(string) + if strings.Contains(liveness, " ") { + for _, item := range strings.Split(liveness, " ") { + kv := strings.Split(item, ":") + if len(kv) != 2 { + continue + } + if kv[0] == s.addr { + liveness = kv[1] + break + } + } + } + switch liveness { case "unreachable": return unreachable case "reachable": diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index caefb2ae5..9d7961958 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1665,3 +1665,46 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() { }, 3*time.Second, 200*time.Millisecond) s.checkCache(remaining) } + +func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { + // init region cache + s.cache.LocateKey(s.bo, []byte("a")) + + store1 := s.cache.getStoreByStoreID(s.store1) + s.Require().NotNil(store1) + s.Require().Equal(resolved, store1.getResolveState()) + + // setup mock liveness func + store1Liveness := uint32(unreachable) + tf := func(s *Store, bo *retry.Backoffer) livenessState { + if s.storeID == store1.storeID { + return livenessState(atomic.LoadUint32(&store1Liveness)) + } + return reachable + } + s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + + // start health check loop + atomic.StoreUint32(&store1.livenessState, store1Liveness) + go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second) + + // update store meta + s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...) + + // assert that the old store should be deleted and it's not reachable + s.Eventually(func() bool { + return store1.getResolveState() == deleted && store1.getLivenessState() != reachable + }, 3*time.Second, time.Second) + + // assert that the new store should be added and it's also not reachable + newStore1 := s.cache.getStoreByStoreID(s.store1) + s.Require().NotEqual(reachable, newStore1.getLivenessState()) + + // recover store1 + atomic.StoreUint32(&store1Liveness, uint32(reachable)) + + // assert that the new store should be reachable + s.Eventually(func() bool { + return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable + }, 3*time.Second, time.Second) +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 9d6558b71..567c677e9 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -981,7 +981,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() regionStore := region.getStore() leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr s.NotEqual(leaderAddr, "") - for i := 0; i < 10; i++ { + for i := 0; i < 30; i++ { bo := retry.NewBackofferWithVars(context.Background(), 100, nil) resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) diff --git a/internal/retry/backoff_test.go b/internal/retry/backoff_test.go index 1668c5d5d..895c77e86 100644 --- a/internal/retry/backoff_test.go +++ b/internal/retry/backoff_test.go @@ -72,7 +72,8 @@ func TestBackoffErrorType(t *testing.T) { err = b.Backoff(BoTxnNotFound, errors.New("txn not found")) if err != nil { // Next backoff should return error of backoff that sleeps for longest time. - assert.ErrorIs(t, err, BoTxnNotFound.err) + cfg, _ := b.longestSleepCfg() + assert.ErrorIs(t, err, cfg.err) return } } diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index f5eefaa2d..e9e61db0f 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -41,14 +41,17 @@ import ( "hash/crc64" "testing" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/tikv" ) func TestRawKV(t *testing.T) { + tikv.EnableFailpoints() suite.Run(t, new(testRawkvSuite)) } @@ -77,9 +80,11 @@ func (s *testRawkvSuite) SetupTest() { s.peer1 = peerIDs[0] s.peer2 = peerIDs[1] s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) + s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`)) } func (s *testRawkvSuite) TearDownTest() { + s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval")) s.mvccStore.Close() } @@ -110,6 +115,9 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + getVal, err := client.Get(context.Background(), testKey) s.Nil(err) @@ -172,6 +180,9 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) @@ -200,6 +211,9 @@ func (s *testRawkvSuite) TestReplaceStore() { s.cluster.RemovePeer(s.region1, s.peer1) s.cluster.ChangeLeader(s.region1, peer3) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + err = client.Put(context.Background(), testKey, testValue) s.Nil(err) } @@ -234,6 +248,9 @@ func (s *testRawkvSuite) TestColumnFamilyForClient() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test get client.SetColumnFamily(cf1) getVal, err := client.Get(context.Background(), testKeyCf1) @@ -303,6 +320,9 @@ func (s *testRawkvSuite) TestColumnFamilyForOptions() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test get getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1)) s.Nil(err) @@ -370,6 +390,9 @@ func (s *testRawkvSuite) TestBatch() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test BatchGet returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf)) s.Nil(err) @@ -426,6 +449,9 @@ func (s *testRawkvSuite) TestScan() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test scan startKey, endKey := []byte("key1"), []byte("keyz") limit := 3 @@ -498,6 +524,9 @@ func (s *testRawkvSuite) TestDeleteRange() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test DeleteRange startKey, endKey := []byte("key3"), []byte(nil) err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf)) @@ -535,6 +564,9 @@ func (s *testRawkvSuite) TestCompareAndSwap() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test CompareAndSwap for false atomic _, _, err = client.CompareAndSwap( context.Background(),