-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix the issue that health check may set liveness wrongly #1127
Changes from all commits
7abfa0f
6fb32b7
4e8ce49
adbdf8c
a644b9e
682d1b8
859f200
4a345b6
9c77d5a
896958a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2618,6 +2618,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { | |
addr = store.GetAddress() | ||
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { | ||
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} | ||
newStore.livenessState = atomic.LoadUint32(&s.livenessState) | ||
newStore.unreachableSince = s.unreachableSince | ||
if s.addr == addr { | ||
newStore.slowScore = s.slowScore | ||
} | ||
|
@@ -2783,50 +2785,84 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff | |
// It may be already started by another thread. | ||
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { | ||
s.unreachableSince = time.Now() | ||
go s.checkUntilHealth(c) | ||
reResolveInterval := 30 * time.Second | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to make it a constant value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rawkv tests use failpoint to set it shorter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about var DefReResolveInterval = 30 * time.Second // global scope.
func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState){
reResolveInterval = DefReResolveInterval
...
} and if test needs a shorter reResolveInterval, change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Personally I do not prefer that. The global var is required to be handled carefully to avoid data race in ut. I've struggled with |
||
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { | ||
if dur, err := time.ParseDuration(val.(string)); err == nil { | ||
reResolveInterval = dur | ||
} | ||
} | ||
go s.checkUntilHealth(c, liveness, reResolveInterval) | ||
} | ||
return | ||
} | ||
|
||
func (s *Store) checkUntilHealth(c *RegionCache) { | ||
defer atomic.StoreUint32(&s.livenessState, uint32(reachable)) | ||
|
||
func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) { | ||
ticker := time.NewTicker(time.Second) | ||
defer ticker.Stop() | ||
defer func() { | ||
ticker.Stop() | ||
if liveness != reachable { | ||
logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop", | ||
zap.Uint64("storeID", s.storeID), | ||
zap.String("state", s.getResolveState().String()), | ||
zap.String("liveness", s.getLivenessState().String())) | ||
} | ||
}() | ||
lastCheckPDTime := time.Now() | ||
|
||
for { | ||
select { | ||
case <-c.ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
if time.Since(lastCheckPDTime) > time.Second*30 { | ||
if time.Since(lastCheckPDTime) > reResolveInterval { | ||
lastCheckPDTime = time.Now() | ||
|
||
valid, err := s.reResolve(c) | ||
if err != nil { | ||
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) | ||
} else if !valid { | ||
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr)) | ||
if s.getResolveState() == deleted { | ||
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). | ||
newStore, _ := c.getStore(s.storeID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to add an assertion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's guaranteed by |
||
logutil.BgLogger().Info("[health check] store meta changed", | ||
zap.Uint64("storeID", s.storeID), | ||
zap.String("oldAddr", s.addr), | ||
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), | ||
zap.String("newAddr", newStore.addr), | ||
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) | ||
go newStore.checkUntilHealth(c, liveness, reResolveInterval) | ||
} | ||
return | ||
} | ||
} | ||
|
||
l := s.requestLiveness(c.ctx, c) | ||
if l == reachable { | ||
liveness = s.requestLiveness(c.ctx, c) | ||
atomic.StoreUint32(&s.livenessState, uint32(liveness)) | ||
if liveness == reachable { | ||
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) | ||
|
||
return | ||
} | ||
atomic.StoreUint32(&s.livenessState, uint32(l)) | ||
} | ||
} | ||
} | ||
|
||
func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) { | ||
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. | ||
if val, err := util.EvalFailpoint("injectLiveness"); err == nil { | ||
switch val.(string) { | ||
liveness := val.(string) | ||
if strings.Contains(liveness, " ") { | ||
for _, item := range strings.Split(liveness, " ") { | ||
kv := strings.Split(item, ":") | ||
if len(kv) != 2 { | ||
continue | ||
} | ||
if kv[0] == s.addr { | ||
liveness = kv[1] | ||
break | ||
} | ||
} | ||
} | ||
switch liveness { | ||
case "unreachable": | ||
return unreachable | ||
case "reachable": | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about abstracting a
newStore
function to force specify thelivenessState
andunreachableSince
as the input parameters.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about to handle it later (as a part of #1104). There are 4
&Store{...}
in region_cache.go now, I'd like to handle them togather according to the store lifecycle. Let this PR fix the corresponding issue by now.