Skip to content

Commit

Permalink
*: add rpc error stats and refine log (pingcap#52810) (pingcap#55719)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 29, 2024
1 parent da340fc commit 227028a
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 54 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7041,13 +7041,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "fbc3617843b52ad90ebafe792825cab26abb21236e99ad60ebbb43f83f05035d",
strip_prefix = "github.com/tikv/client-go/[email protected].20240807051551-73815ad261dc",
sha256 = "b7e2cea71a33b24158b97113cd79f4d64f9228faa468be7c520dcef609863f33",
strip_prefix = "github.com/tikv/client-go/[email protected].20240828024346-bc6139e2f0c4",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240807051551-73815ad261dc.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240807051551-73815ad261dc.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240807051551-73815ad261dc.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240807051551-73815ad261dc.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240828024346-bc6139e2f0c4.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240828024346-bc6139e2f0c4.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240828024346-bc6139e2f0c4.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240828024346-bc6139e2f0c4.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ require (
github.com/stretchr/testify v1.9.0
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20240807051551-73815ad261dc
github.com/tikv/client-go/v2 v2.0.8-0.20240828024346-bc6139e2f0c4
github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -996,8 +996,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20240807051551-73815ad261dc h1:dHXK/ahoO3PhU9oqiYwB4BBNl2PpgJiZODmtWAfxygo=
github.com/tikv/client-go/v2 v2.0.8-0.20240807051551-73815ad261dc/go.mod h1:37p0ryKaieJbBpVDWnaPi2ZS6UFqkgpsemBLkGX2FvM=
github.com/tikv/client-go/v2 v2.0.8-0.20240828024346-bc6139e2f0c4 h1:BTJqx0SapNM6p5d7WEk8XJ4EvVKiSRPyxcnlQ3XTzyI=
github.com/tikv/client-go/v2 v2.0.8-0.20240828024346-bc6139e2f0c4/go.mod h1:37p0ryKaieJbBpVDWnaPi2ZS6UFqkgpsemBLkGX2FvM=
github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c h1:oZygf/SCdTUhjoHuZRE85EBgK0oA6LjikpWuJqqjM8U=
github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c/go.mod h1:NW6Af689Jw1FDxjq+WL0nqOdmQ1XT0ly2R1SIKfQuUw=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
Expand Down
10 changes: 6 additions & 4 deletions pkg/distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: 15,
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
Expand All @@ -130,13 +130,15 @@ func TestSelectResultRuntimeStats(t *testing.T) {
// Test for idempotence.
require.Equal(t, expect, stats.String())

s1.rpcStat.Stats[tikvrpc.CmdCop] = &tikv.RPCRuntimeStats{
s1.reqStat.RPCStats[tikvrpc.CmdCop] = &tikv.RPCRuntimeStats{
Count: 1,
Consume: int64(time.Second),
}
s1.reqStat.RecordRPCErrorStats("server_is_busy")
s1.reqStat.RecordRPCErrorStats("server_is_busy")
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:1, total_time:1s}, rpc_errors:{server_is_busy:2}}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand All @@ -145,7 +147,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
s1.procKeys.Add(100)
Expand Down
29 changes: 10 additions & 19 deletions pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/pingcap/tipb/go-tipb"
tikvmetrics "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -549,7 +548,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.stats == nil {
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
if ci, ok := r.resp.(copr.CopInfo); ok {
Expand Down Expand Up @@ -659,7 +658,7 @@ type selectResultRuntimeStats struct {
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
reqStat *tikv.RegionRequestRuntimeStats
distSQLConcurrency int
extraConcurrency int
CoprCacheHitNum int64
Expand All @@ -678,7 +677,7 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim
maps.Copy(s.backoffSleep, copStats.BackoffSleep)
s.totalProcessTime += copStats.TimeDetail.ProcessTime
s.totalWaitTime += copStats.TimeDetail.WaitTime
s.rpcStat.Merge(copStats.RegionRequestRuntimeStats)
s.reqStat.Merge(copStats.ReqStats)
if copStats.CoprCacheHit {
s.CoprCacheHitNum++
}
Expand All @@ -689,7 +688,7 @@ func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
copRespTime: execdetails.Percentile[execdetails.Duration]{},
procKeys: execdetails.Percentile[execdetails.Int64]{},
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
extraConcurrency: s.extraConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
Expand All @@ -704,7 +703,7 @@ func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
}
newRs.totalProcessTime += s.totalProcessTime
newRs.totalWaitTime += s.totalWaitTime
maps.Copy(newRs.rpcStat.Stats, s.rpcStat.Stats)
newRs.reqStat = s.reqStat.Clone()
return &newRs
}

Expand All @@ -721,7 +720,7 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
}
s.totalProcessTime += other.totalProcessTime
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.reqStat.Merge(other.reqStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
if other.distSQLConcurrency > s.distSQLConcurrency {
s.distSQLConcurrency = other.distSQLConcurrency
Expand All @@ -736,7 +735,7 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {

func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
rpcStat := s.rpcStat
reqStat := s.reqStat
if s.copRespTime.Size() > 0 {
size := s.copRespTime.Size()
if size == 1 {
Expand Down Expand Up @@ -767,15 +766,6 @@ func (s *selectResultRuntimeStats) String() string {
buf.WriteString(execdetails.FormatDuration(s.totalWaitTime))
}
}
copRPC := rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
rpcStat = rpcStat.Clone()
delete(rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(execdetails.FormatDuration(time.Duration(copRPC.Consume)))
}
if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 {
fmt.Fprintf(buf, ", copr_cache_hit_ratio: %v",
strconv.FormatFloat(s.calcCacheHit(), 'f', 2, 64))
Expand Down Expand Up @@ -805,10 +795,11 @@ func (s *selectResultRuntimeStats) String() string {
buf.WriteString("}")
}

rpcStatsStr := rpcStat.String()
rpcStatsStr := reqStat.String()
if len(rpcStatsStr) > 0 {
buf.WriteString(", ")
buf.WriteString(", rpc_info:{")
buf.WriteString(rpcStatsStr)
buf.WriteString("}")
}

if len(s.backoffSleep) > 0 {
Expand Down
16 changes: 8 additions & 8 deletions pkg/executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,16 +427,16 @@ func TestCoprocessorPagingSize(t *testing.T) {
// Check 'rpc_num' in the execution information
//
// mysql> explain analyze select * from t_paging;
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, rpc_num: 10, rpc_time: 6.69ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:10, total_time:6.69ms}} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// 2 rows in set (0.01 sec)

getRPCNumFromExplain := func(rows [][]interface{}) (res uint64) {
re := regexp.MustCompile("rpc_num: ([0-9]+)")
getRPCNumFromExplain := func(rows [][]any) (res uint64) {
re := regexp.MustCompile("num_rpc:([0-9]+)")
for _, row := range rows {
buf := bytes.NewBufferString("")
_, _ = fmt.Fprintf(buf, "%s\n", row)
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestCollectCopRuntimeStats(t *testing.T) {
rows := tk.MustQuery("explain analyze select * from t1").Rows()
require.Len(t, rows, 2)
explain := fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*rpc_num: .*, .*regionMiss:.*", explain)
require.Regexp(t, ".*num_rpc:.*, .*regionMiss:.*", explain)
require.NoError(t, failpoint.Disable("tikvclient/tikvStoreRespResult"))
}

Expand Down Expand Up @@ -586,15 +586,15 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for tikv_client_read_timeout session variable.
tk.MustExec("set @@tikv_client_read_timeout=1;")
Expand All @@ -614,15 +614,15 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)
}

func TestGetMvccByEncodedKeyRegionError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestExplainFor(t *testing.T) {
buf.WriteString(fmt.Sprintf("%v", v))
}
}
require.Regexp(t, "TableReader_5 10000.00 0 root time:.*, loops:1,( RU:.*,)? cop_task: {num:.*, max:.*, proc_keys:.* rpc_num: 1, rpc_time:.*} data:TableFullScan_4 N/A N/A\n"+
require.Regexp(t, "TableReader_5 10000.00 0 root time:.*, loops:1,( RU:.*,)? cop_task: {num:.*, max:.*, proc_keys:.*num_rpc:1, total_time:.*} data:TableFullScan_4 N/A N/A\n"+
"└─TableFullScan_4 10000.00 0 cop.* table:t1 tikv_task:{time:.*, loops:0} keep order:false, stats:pseudo N/A N/A",
buf.String())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *tikv.RP
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil && ss.enableCollectExecutionInfo {
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
ss.Stats.RecordRPCRuntimeStats(req.Type, time.Since(start))
}
if err != nil {
cancel()
Expand Down
16 changes: 13 additions & 3 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
req.StoreTp = getEndPointType(task.storeType)
startTime := time.Now()
if worker.kvclient.Stats == nil {
worker.kvclient.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats)
worker.kvclient.Stats = tikv.NewRegionRequestRuntimeStats()
}
// set ReadReplicaScope and TxnScope so that req.IsStaleRead will be true when it's a global scope stale read.
req.ReadReplicaScope = worker.req.ReadReplicaScope
Expand Down Expand Up @@ -1305,10 +1305,16 @@ const (

func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *coprocessor.Response) {
logStr := fmt.Sprintf("[TIME_COP_PROCESS] resp_time:%s txnStartTS:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.GetID(), task.storeAddr)
if worker.kvclient.Stats != nil {
logStr += fmt.Sprintf(" stats:%s", worker.kvclient.Stats.String())
}
if bo.GetTotalSleep() > minLogBackoffTime {
backoffTypes := strings.ReplaceAll(fmt.Sprintf("%v", bo.TiKVBackoffer().GetTypes()), " ", ",")
logStr += fmt.Sprintf(" backoff_ms:%d backoff_types:%s", bo.GetTotalSleep(), backoffTypes)
}
if regionErr := resp.GetRegionError(); regionErr != nil {
logStr += fmt.Sprintf(" region_err:%s", regionErr.String())
}
// resp might be nil, but it is safe to call resp.GetXXX here.
detailV2 := resp.GetExecDetailsV2()
detail := resp.GetExecDetails()
Expand Down Expand Up @@ -1756,7 +1762,11 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
}
resp.detail.Stats = worker.kvclient.Stats
worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp)
}

func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) {
copStats.ReqStats = worker.kvclient.Stats
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
Expand Down Expand Up @@ -1796,7 +1806,7 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
// CopRuntimeStats contains execution detail information.
type CopRuntimeStats struct {
execdetails.ExecDetails
tikv.RegionRequestRuntimeStats
ReqStats *tikv.RegionRequestRuntimeStats

CoprCacheHit bool
}
Expand Down
Loading

0 comments on commit 227028a

Please sign in to comment.