Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more log for diagnose #914

Merged
merged 11 commits into from
Jul 31, 2023
Merged
38 changes: 38 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,24 @@ const (
tombstone
)

// String implements fmt.Stringer interface.
func (s resolveState) String() string {
switch s {
case unresolved:
return "unresolved"
case resolved:
return "resolved"
case needCheck:
return "needCheck"
case deleted:
return "deleted"
case tombstone:
return "tombstone"
default:
return fmt.Sprintf("unknown-%v", uint64(s))
}
}

// IsTiFlash returns true if the storeType is TiFlash
func (s *Store) IsTiFlash() bool {
return s.storeType == tikvrpc.TiFlash
Expand Down Expand Up @@ -2462,6 +2480,12 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool {
return false
}
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
logutil.BgLogger().Info("change store resolve state",
zap.Uint64("store", s.storeID),
zap.String("addr", s.addr),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.String("liveness-state", s.getLivenessState().String()))
return true
}
}
Expand Down Expand Up @@ -2549,6 +2573,20 @@ const (
unknown
)

// String implements fmt.Stringer interface.
func (s livenessState) String() string {
switch s {
case unreachable:
return "unreachable"
case reachable:
return "reachable"
case unknown:
return "unknown"
default:
return fmt.Sprintf("unknown-%v", uint32(s))
}
}

func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
Expand Down
90 changes: 86 additions & 4 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package locate

import (
"bytes"
"context"
"fmt"
"math/rand"
Expand Down Expand Up @@ -607,11 +608,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.BgLogger().Warn("unable to find stores with given labels")
logutil.BgLogger().Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || state.IsLeaderExhausted(leader) {
if leaderInvalid {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -1084,6 +1089,7 @@ func (s *RegionRequestSender) SendReqCtx(
}()
}

totalErrors := make(map[string]int)
for {
if tryTimes > 0 {
req.IsRetryRequest = true
Expand Down Expand Up @@ -1112,7 +1118,7 @@ func (s *RegionRequestSender) SendReqCtx(

// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
logutil.Logger(bo.GetCtx()).Debug("throwing pseudo region error due to region not found in cache", zap.Stringer("region", &regionID))
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, totalErrors)
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
return resp, nil, err
}
Expand All @@ -1138,6 +1144,8 @@ func (s *RegionRequestSender) SendReqCtx(
var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
if err != nil {
msg := fmt.Sprintf("send request failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors)
return nil, nil, err
}

Expand All @@ -1162,14 +1170,19 @@ func (s *RegionRequestSender) SendReqCtx(
return nil, nil, err
}
if regionErr != nil {
regionErrLabel := regionErrorToLabel(regionErr)
totalErrors[regionErrLabel]++
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
if err != nil {
msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors)
return nil, nil, err
}
if retry {
tryTimes++
continue
}
s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, totalErrors)
} else {
if s.replicaSelector != nil {
s.replicaSelector.onSendSuccess()
Expand All @@ -1182,6 +1195,75 @@ func (s *RegionRequestSender) SendReqCtx(
}
}

func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) {
var replicaStatus []string
replicaSelectorState := "nil"
cacheRegionIsValid := "unknown"
if s.replicaSelector != nil {
switch s.replicaSelector.state.(type) {
case *accessKnownLeader:
replicaSelectorState = "accessKnownLeader"
case *accessFollower:
replicaSelectorState = "accessFollower"
case *accessByKnownProxy:
replicaSelectorState = "accessByKnownProxy"
case *tryFollower:
replicaSelectorState = "tryFollower"
case *tryNewProxy:
replicaSelectorState = "tryNewProxy"
case *invalidLeader:
replicaSelectorState = "invalidLeader"
case *invalidStore:
replicaSelectorState = "invalidStore"
case *stateBase:
replicaSelectorState = "stateBase"
case nil:
replicaSelectorState = "nil"
}
if s.replicaSelector.region != nil {
if s.replicaSelector.region.isValid() {
cacheRegionIsValid = "true"
} else {
cacheRegionIsValid = "false"
}
}
for _, replica := range s.replicaSelector.replicas {
replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v",
replica.peer.GetId(),
replica.store.storeID,
replica.isEpochStale(),
replica.attempts,
replica.epoch,
atomic.LoadUint32(&replica.store.epoch),
replica.store.getResolveState(),
replica.store.getLivenessState(),
))
}
}
var totalErrorStr bytes.Buffer
for err, cnt := range totalErrors {
if totalErrorStr.Len() > 0 {
totalErrorStr.WriteString(", ")
}
totalErrorStr.WriteString(err)
totalErrorStr.WriteString(":")
totalErrorStr.WriteString(strconv.Itoa(cnt))
}
logutil.Logger(bo.GetCtx()).Info(msg,
zap.Uint64("req-ts", req.GetStartTS()),
zap.String("req-type", req.Type.String()),
zap.String("region", regionID.String()),
zap.String("region-is-valid", cacheRegionIsValid),
zap.Int("retry-times", retryTimes),
zap.String("replica-read-type", req.ReplicaReadType.String()),
zap.String("replica-selector-state", replicaSelectorState),
zap.Bool("stale-read", req.StaleRead),
zap.String("replica-status", strings.Join(replicaStatus, "; ")),
zap.Int("total-backoff-ms", bo.GetTotalSleep()),
zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()),
zap.String("total-region-errors", totalErrorStr.String()))
}

// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx.
type RPCCancellerCtxKey struct{}

Expand Down
14 changes: 14 additions & 0 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
package retry

import (
"bytes"
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -150,6 +152,18 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
errMsg += "\n" + err.Error()
}
}
var backoffDetail bytes.Buffer
totalTimes := 0
for name, times := range b.backoffTimes {
totalTimes += times
if backoffDetail.Len() > 0 {
backoffDetail.WriteString(", ")
}
backoffDetail.WriteString(name)
backoffDetail.WriteString(":")
backoffDetail.WriteString(strconv.Itoa(times))
}
errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String())
returnedErr := err
if longestSleepCfg != nil {
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
Expand Down
16 changes: 16 additions & 0 deletions kv/store_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
package kv

import (
"fmt"

"go.uber.org/atomic"
)

Expand Down Expand Up @@ -68,3 +70,17 @@ const (
func (r ReplicaReadType) IsFollowerRead() bool {
return r != ReplicaReadLeader
}

// String implements fmt.Stringer interface.
func (r ReplicaReadType) String() string {
switch r {
case ReplicaReadLeader:
return "leader"
case ReplicaReadFollower:
return "follower"
case ReplicaReadMixed:
return "mixed"
default:
return fmt.Sprintf("unknown-%v", byte(r))
}
}
48 changes: 48 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,3 +1271,51 @@ func (req *Request) IsTxnWriteRequest() bool {

// ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context.
type ResourceGroupTagger func(req *Request)

// GetStartTS returns the `start_ts` of the request.
func (req *Request) GetStartTS() uint64 {
switch req.Type {
case CmdGet:
return req.Get().GetVersion()
case CmdScan:
return req.Scan().GetVersion()
case CmdPrewrite:
return req.Prewrite().GetStartVersion()
case CmdCommit:
return req.Commit().GetStartVersion()
case CmdCleanup:
return req.Cleanup().GetStartVersion()
case CmdBatchGet:
return req.BatchGet().GetVersion()
case CmdBatchRollback:
return req.BatchRollback().GetStartVersion()
case CmdScanLock:
return req.ScanLock().GetMaxVersion()
case CmdResolveLock:
return req.ResolveLock().GetStartVersion()
case CmdPessimisticLock:
return req.PessimisticLock().GetStartVersion()
case CmdPessimisticRollback:
return req.PessimisticRollback().GetStartVersion()
case CmdTxnHeartBeat:
return req.TxnHeartBeat().GetStartVersion()
case CmdCheckTxnStatus:
return req.CheckTxnStatus().GetLockTs()
case CmdCheckSecondaryLocks:
return req.CheckSecondaryLocks().GetStartVersion()
case CmdFlashbackToVersion:
return req.FlashbackToVersion().GetStartTs()
case CmdPrepareFlashbackToVersion:
req.PrepareFlashbackToVersion().GetStartTs()
case CmdCop:
return req.Cop().GetStartTs()
case CmdCopStream:
return req.Cop().GetStartTs()
case CmdBatchCop:
return req.BatchCop().GetStartTs()
case CmdMvccGetByStartTs:
return req.MvccGetByStartTs().GetStartTs()
default:
}
return 0
}
Loading