Skip to content

Commit

Permalink
add more log for diagnose (#914)
Browse files Browse the repository at this point in the history
* add more log for diagnose

Signed-off-by: crazycs520 <[email protected]>

* add comment

Signed-off-by: crazycs520 <[email protected]>

* refine

Signed-off-by: crazycs520 <[email protected]>

* add log

Signed-off-by: crazycs520 <[email protected]>

* update

Signed-off-by: crazycs520 <[email protected]>

* fix

Signed-off-by: crazycs520 <[email protected]>

* refine

Signed-off-by: crazycs520 <[email protected]>

* fix

Signed-off-by: crazycs520 <[email protected]>

* update

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 authored Jul 31, 2023
1 parent 8ed240d commit 7bb81ba
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 4 deletions.
38 changes: 38 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,24 @@ const (
tombstone
)

// String implements fmt.Stringer interface.
func (s resolveState) String() string {
switch s {
case unresolved:
return "unresolved"
case resolved:
return "resolved"
case needCheck:
return "needCheck"
case deleted:
return "deleted"
case tombstone:
return "tombstone"
default:
return fmt.Sprintf("unknown-%v", uint64(s))
}
}

// IsTiFlash returns true if the storeType is TiFlash
func (s *Store) IsTiFlash() bool {
return s.storeType == tikvrpc.TiFlash
Expand Down Expand Up @@ -2462,6 +2480,12 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool {
return false
}
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
logutil.BgLogger().Info("change store resolve state",
zap.Uint64("store", s.storeID),
zap.String("addr", s.addr),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.String("liveness-state", s.getLivenessState().String()))
return true
}
}
Expand Down Expand Up @@ -2549,6 +2573,20 @@ const (
unknown
)

// String implements fmt.Stringer interface.
func (s livenessState) String() string {
switch s {
case unreachable:
return "unreachable"
case reachable:
return "reachable"
case unknown:
return "unknown"
default:
return fmt.Sprintf("unknown-%v", uint32(s))
}
}

func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
Expand Down
90 changes: 86 additions & 4 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package locate

import (
"bytes"
"context"
"fmt"
"math/rand"
Expand Down Expand Up @@ -607,11 +608,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.BgLogger().Warn("unable to find stores with given labels")
logutil.BgLogger().Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || state.IsLeaderExhausted(leader) {
if leaderInvalid {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -1084,6 +1089,7 @@ func (s *RegionRequestSender) SendReqCtx(
}()
}

totalErrors := make(map[string]int)
for {
if tryTimes > 0 {
req.IsRetryRequest = true
Expand Down Expand Up @@ -1112,7 +1118,7 @@ func (s *RegionRequestSender) SendReqCtx(

// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
logutil.Logger(bo.GetCtx()).Debug("throwing pseudo region error due to region not found in cache", zap.Stringer("region", &regionID))
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, totalErrors)
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
return resp, nil, err
}
Expand All @@ -1138,6 +1144,8 @@ func (s *RegionRequestSender) SendReqCtx(
var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
if err != nil {
msg := fmt.Sprintf("send request failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors)
return nil, nil, err
}

Expand All @@ -1162,14 +1170,19 @@ func (s *RegionRequestSender) SendReqCtx(
return nil, nil, err
}
if regionErr != nil {
regionErrLabel := regionErrorToLabel(regionErr)
totalErrors[regionErrLabel]++
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
if err != nil {
msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors)
return nil, nil, err
}
if retry {
tryTimes++
continue
}
s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, totalErrors)
} else {
if s.replicaSelector != nil {
s.replicaSelector.onSendSuccess()
Expand All @@ -1182,6 +1195,75 @@ func (s *RegionRequestSender) SendReqCtx(
}
}

func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) {
var replicaStatus []string
replicaSelectorState := "nil"
cacheRegionIsValid := "unknown"
if s.replicaSelector != nil {
switch s.replicaSelector.state.(type) {
case *accessKnownLeader:
replicaSelectorState = "accessKnownLeader"
case *accessFollower:
replicaSelectorState = "accessFollower"
case *accessByKnownProxy:
replicaSelectorState = "accessByKnownProxy"
case *tryFollower:
replicaSelectorState = "tryFollower"
case *tryNewProxy:
replicaSelectorState = "tryNewProxy"
case *invalidLeader:
replicaSelectorState = "invalidLeader"
case *invalidStore:
replicaSelectorState = "invalidStore"
case *stateBase:
replicaSelectorState = "stateBase"
case nil:
replicaSelectorState = "nil"
}
if s.replicaSelector.region != nil {
if s.replicaSelector.region.isValid() {
cacheRegionIsValid = "true"
} else {
cacheRegionIsValid = "false"
}
}
for _, replica := range s.replicaSelector.replicas {
replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v",
replica.peer.GetId(),
replica.store.storeID,
replica.isEpochStale(),
replica.attempts,
replica.epoch,
atomic.LoadUint32(&replica.store.epoch),
replica.store.getResolveState(),
replica.store.getLivenessState(),
))
}
}
var totalErrorStr bytes.Buffer
for err, cnt := range totalErrors {
if totalErrorStr.Len() > 0 {
totalErrorStr.WriteString(", ")
}
totalErrorStr.WriteString(err)
totalErrorStr.WriteString(":")
totalErrorStr.WriteString(strconv.Itoa(cnt))
}
logutil.Logger(bo.GetCtx()).Info(msg,
zap.Uint64("req-ts", req.GetStartTS()),
zap.String("req-type", req.Type.String()),
zap.String("region", regionID.String()),
zap.String("region-is-valid", cacheRegionIsValid),
zap.Int("retry-times", retryTimes),
zap.String("replica-read-type", req.ReplicaReadType.String()),
zap.String("replica-selector-state", replicaSelectorState),
zap.Bool("stale-read", req.StaleRead),
zap.String("replica-status", strings.Join(replicaStatus, "; ")),
zap.Int("total-backoff-ms", bo.GetTotalSleep()),
zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()),
zap.String("total-region-errors", totalErrorStr.String()))
}

// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx.
type RPCCancellerCtxKey struct{}

Expand Down
14 changes: 14 additions & 0 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
package retry

import (
"bytes"
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -150,6 +152,18 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
errMsg += "\n" + err.Error()
}
}
var backoffDetail bytes.Buffer
totalTimes := 0
for name, times := range b.backoffTimes {
totalTimes += times
if backoffDetail.Len() > 0 {
backoffDetail.WriteString(", ")
}
backoffDetail.WriteString(name)
backoffDetail.WriteString(":")
backoffDetail.WriteString(strconv.Itoa(times))
}
errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String())
returnedErr := err
if longestSleepCfg != nil {
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
Expand Down
16 changes: 16 additions & 0 deletions kv/store_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
package kv

import (
"fmt"

"go.uber.org/atomic"
)

Expand Down Expand Up @@ -68,3 +70,17 @@ const (
func (r ReplicaReadType) IsFollowerRead() bool {
return r != ReplicaReadLeader
}

// String implements fmt.Stringer interface.
func (r ReplicaReadType) String() string {
switch r {
case ReplicaReadLeader:
return "leader"
case ReplicaReadFollower:
return "follower"
case ReplicaReadMixed:
return "mixed"
default:
return fmt.Sprintf("unknown-%v", byte(r))
}
}
48 changes: 48 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,3 +1271,51 @@ func (req *Request) IsTxnWriteRequest() bool {

// ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context.
type ResourceGroupTagger func(req *Request)

// GetStartTS returns the `start_ts` of the request.
func (req *Request) GetStartTS() uint64 {
switch req.Type {
case CmdGet:
return req.Get().GetVersion()
case CmdScan:
return req.Scan().GetVersion()
case CmdPrewrite:
return req.Prewrite().GetStartVersion()
case CmdCommit:
return req.Commit().GetStartVersion()
case CmdCleanup:
return req.Cleanup().GetStartVersion()
case CmdBatchGet:
return req.BatchGet().GetVersion()
case CmdBatchRollback:
return req.BatchRollback().GetStartVersion()
case CmdScanLock:
return req.ScanLock().GetMaxVersion()
case CmdResolveLock:
return req.ResolveLock().GetStartVersion()
case CmdPessimisticLock:
return req.PessimisticLock().GetStartVersion()
case CmdPessimisticRollback:
return req.PessimisticRollback().GetStartVersion()
case CmdTxnHeartBeat:
return req.TxnHeartBeat().GetStartVersion()
case CmdCheckTxnStatus:
return req.CheckTxnStatus().GetLockTs()
case CmdCheckSecondaryLocks:
return req.CheckSecondaryLocks().GetStartVersion()
case CmdFlashbackToVersion:
return req.FlashbackToVersion().GetStartTs()
case CmdPrepareFlashbackToVersion:
req.PrepareFlashbackToVersion().GetStartTs()
case CmdCop:
return req.Cop().GetStartTs()
case CmdCopStream:
return req.Cop().GetStartTs()
case CmdBatchCop:
return req.BatchCop().GetStartTs()
case CmdMvccGetByStartTs:
return req.MvccGetByStartTs().GetStartTs()
default:
}
return 0
}

0 comments on commit 7bb81ba

Please sign in to comment.