From 7bb81baca28cb73f5d22b034587dad39c4b158b3 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 31 Jul 2023 17:10:24 +0800 Subject: [PATCH] add more log for diagnose (#914) * add more log for diagnose Signed-off-by: crazycs520 * add comment Signed-off-by: crazycs520 * refine Signed-off-by: crazycs520 * add log Signed-off-by: crazycs520 * update Signed-off-by: crazycs520 * fix Signed-off-by: crazycs520 * refine Signed-off-by: crazycs520 * fix Signed-off-by: crazycs520 * update Signed-off-by: crazycs520 --------- Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 38 +++++++++++++ internal/locate/region_request.go | 90 +++++++++++++++++++++++++++++-- internal/retry/backoff.go | 14 +++++ kv/store_vars.go | 16 ++++++ tikvrpc/tikvrpc.go | 48 +++++++++++++++++ 5 files changed, 202 insertions(+), 4 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index c12dd436f..00d79bada 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2328,6 +2328,24 @@ const ( tombstone ) +// String implements fmt.Stringer interface. +func (s resolveState) String() string { + switch s { + case unresolved: + return "unresolved" + case resolved: + return "resolved" + case needCheck: + return "needCheck" + case deleted: + return "deleted" + case tombstone: + return "tombstone" + default: + return fmt.Sprintf("unknown-%v", uint64(s)) + } +} + // IsTiFlash returns true if the storeType is TiFlash func (s *Store) IsTiFlash() bool { return s.storeType == tikvrpc.TiFlash @@ -2462,6 +2480,12 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool { return false } if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) { + logutil.BgLogger().Info("change store resolve state", + zap.Uint64("store", s.storeID), + zap.String("addr", s.addr), + zap.String("from", from.String()), + zap.String("to", to.String()), + zap.String("liveness-state", s.getLivenessState().String())) return true } } @@ -2549,6 +2573,20 @@ const ( unknown ) +// String implements fmt.Stringer interface. +func (s livenessState) String() string { + switch s { + case unreachable: + return "unreachable" + case reachable: + return "reachable" + case unknown: + return "unknown" + default: + return fmt.Sprintf("unknown-%v", uint32(s)) + } +} + func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 0928e352e..6feb7c884 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -35,6 +35,7 @@ package locate import ( + "bytes" "context" "fmt" "math/rand" @@ -607,11 +608,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { + leader := selector.replicas[state.leaderIdx] + leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader) if len(state.option.labels) > 0 { - logutil.BgLogger().Warn("unable to find stores with given labels") + logutil.BgLogger().Warn("unable to find stores with given labels", + zap.Uint64("region", selector.region.GetID()), + zap.Bool("leader-invalid", leaderInvalid), + zap.Any("labels", state.option.labels)) } - leader := selector.replicas[state.leaderIdx] - if leader.isEpochStale() || state.IsLeaderExhausted(leader) { + if leaderInvalid { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -1084,6 +1089,7 @@ func (s *RegionRequestSender) SendReqCtx( }() } + totalErrors := make(map[string]int) for { if tryTimes > 0 { req.IsRetryRequest = true @@ -1112,7 +1118,7 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. - logutil.Logger(bo.GetCtx()).Debug("throwing pseudo region error due to region not found in cache", zap.Stringer("region", ®ionID)) + s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, totalErrors) resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, err } @@ -1138,6 +1144,8 @@ func (s *RegionRequestSender) SendReqCtx( var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) if err != nil { + msg := fmt.Sprintf("send request failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) return nil, nil, err } @@ -1162,14 +1170,19 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, err } if regionErr != nil { + regionErrLabel := regionErrorToLabel(regionErr) + totalErrors[regionErrLabel]++ retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { + msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) return nil, nil, err } if retry { tryTimes++ continue } + s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, totalErrors) } else { if s.replicaSelector != nil { s.replicaSelector.onSendSuccess() @@ -1182,6 +1195,75 @@ func (s *RegionRequestSender) SendReqCtx( } } +func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) { + var replicaStatus []string + replicaSelectorState := "nil" + cacheRegionIsValid := "unknown" + if s.replicaSelector != nil { + switch s.replicaSelector.state.(type) { + case *accessKnownLeader: + replicaSelectorState = "accessKnownLeader" + case *accessFollower: + replicaSelectorState = "accessFollower" + case *accessByKnownProxy: + replicaSelectorState = "accessByKnownProxy" + case *tryFollower: + replicaSelectorState = "tryFollower" + case *tryNewProxy: + replicaSelectorState = "tryNewProxy" + case *invalidLeader: + replicaSelectorState = "invalidLeader" + case *invalidStore: + replicaSelectorState = "invalidStore" + case *stateBase: + replicaSelectorState = "stateBase" + case nil: + replicaSelectorState = "nil" + } + if s.replicaSelector.region != nil { + if s.replicaSelector.region.isValid() { + cacheRegionIsValid = "true" + } else { + cacheRegionIsValid = "false" + } + } + for _, replica := range s.replicaSelector.replicas { + replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", + replica.peer.GetId(), + replica.store.storeID, + replica.isEpochStale(), + replica.attempts, + replica.epoch, + atomic.LoadUint32(&replica.store.epoch), + replica.store.getResolveState(), + replica.store.getLivenessState(), + )) + } + } + var totalErrorStr bytes.Buffer + for err, cnt := range totalErrors { + if totalErrorStr.Len() > 0 { + totalErrorStr.WriteString(", ") + } + totalErrorStr.WriteString(err) + totalErrorStr.WriteString(":") + totalErrorStr.WriteString(strconv.Itoa(cnt)) + } + logutil.Logger(bo.GetCtx()).Info(msg, + zap.Uint64("req-ts", req.GetStartTS()), + zap.String("req-type", req.Type.String()), + zap.String("region", regionID.String()), + zap.String("region-is-valid", cacheRegionIsValid), + zap.Int("retry-times", retryTimes), + zap.String("replica-read-type", req.ReplicaReadType.String()), + zap.String("replica-selector-state", replicaSelectorState), + zap.Bool("stale-read", req.StaleRead), + zap.String("replica-status", strings.Join(replicaStatus, "; ")), + zap.Int("total-backoff-ms", bo.GetTotalSleep()), + zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), + zap.String("total-region-errors", totalErrorStr.String())) +} + // RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx. type RPCCancellerCtxKey struct{} diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index 6a27d0593..bdefc7993 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -35,9 +35,11 @@ package retry import ( + "bytes" "context" "fmt" "math" + "strconv" "strings" "sync/atomic" "time" @@ -150,6 +152,18 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e errMsg += "\n" + err.Error() } } + var backoffDetail bytes.Buffer + totalTimes := 0 + for name, times := range b.backoffTimes { + totalTimes += times + if backoffDetail.Len() > 0 { + backoffDetail.WriteString(", ") + } + backoffDetail.WriteString(name) + backoffDetail.WriteString(":") + backoffDetail.WriteString(strconv.Itoa(times)) + } + errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String()) returnedErr := err if longestSleepCfg != nil { errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime) diff --git a/kv/store_vars.go b/kv/store_vars.go index d0abff21d..f66cbaff7 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -35,6 +35,8 @@ package kv import ( + "fmt" + "go.uber.org/atomic" ) @@ -68,3 +70,17 @@ const ( func (r ReplicaReadType) IsFollowerRead() bool { return r != ReplicaReadLeader } + +// String implements fmt.Stringer interface. +func (r ReplicaReadType) String() string { + switch r { + case ReplicaReadLeader: + return "leader" + case ReplicaReadFollower: + return "follower" + case ReplicaReadMixed: + return "mixed" + default: + return fmt.Sprintf("unknown-%v", byte(r)) + } +} diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index bc78d8b65..33a4e05dd 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -1271,3 +1271,51 @@ func (req *Request) IsTxnWriteRequest() bool { // ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context. type ResourceGroupTagger func(req *Request) + +// GetStartTS returns the `start_ts` of the request. +func (req *Request) GetStartTS() uint64 { + switch req.Type { + case CmdGet: + return req.Get().GetVersion() + case CmdScan: + return req.Scan().GetVersion() + case CmdPrewrite: + return req.Prewrite().GetStartVersion() + case CmdCommit: + return req.Commit().GetStartVersion() + case CmdCleanup: + return req.Cleanup().GetStartVersion() + case CmdBatchGet: + return req.BatchGet().GetVersion() + case CmdBatchRollback: + return req.BatchRollback().GetStartVersion() + case CmdScanLock: + return req.ScanLock().GetMaxVersion() + case CmdResolveLock: + return req.ResolveLock().GetStartVersion() + case CmdPessimisticLock: + return req.PessimisticLock().GetStartVersion() + case CmdPessimisticRollback: + return req.PessimisticRollback().GetStartVersion() + case CmdTxnHeartBeat: + return req.TxnHeartBeat().GetStartVersion() + case CmdCheckTxnStatus: + return req.CheckTxnStatus().GetLockTs() + case CmdCheckSecondaryLocks: + return req.CheckSecondaryLocks().GetStartVersion() + case CmdFlashbackToVersion: + return req.FlashbackToVersion().GetStartTs() + case CmdPrepareFlashbackToVersion: + req.PrepareFlashbackToVersion().GetStartTs() + case CmdCop: + return req.Cop().GetStartTs() + case CmdCopStream: + return req.Cop().GetStartTs() + case CmdBatchCop: + return req.BatchCop().GetStartTs() + case CmdMvccGetByStartTs: + return req.MvccGetByStartTs().GetStartTs() + default: + } + return 0 +}