Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client-go: attach request source with retry info for coprocessor #46509

Merged
merged 2 commits into from
Sep 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6937,13 +6937,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "608e5c393dcf7fa07a7a360333816dc479b05bad6ad489a4643c9a096e47f5d9",
strip_prefix = "github.com/tikv/client-go/[email protected].20230811033710-8a214402da13",
sha256 = "9cf5877cb0b43d73140e280ad9c80dccd9684e89659a358ee75702469368cf95",
strip_prefix = "github.com/tikv/client-go/[email protected].20230829002846-295094e5b534",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230829002846-295094e5b534.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20230811033710-8a214402da13
github.com/tikv/client-go/v2 v2.0.8-0.20230829002846-295094e5b534
github.com/tikv/pd/client v0.0.0-20230728033905-31343e006842
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1004,8 +1004,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20230811033710-8a214402da13 h1:oTAPyrDR5UFVhg4SYmHNQ1gHQrwQfBjGGK/zKbz8VcA=
github.com/tikv/client-go/v2 v2.0.8-0.20230811033710-8a214402da13/go.mod h1:J17iHkj8buCLDF7lgKJLX5jq5aGozrbpa7+Ln6g8Xjc=
github.com/tikv/client-go/v2 v2.0.8-0.20230829002846-295094e5b534 h1:QJL3xv/12H69IhRg092GwMvHh4tOV+47d0iYL73/7mM=
github.com/tikv/client-go/v2 v2.0.8-0.20230829002846-295094e5b534/go.mod h1:2cndkbSuokkQ6TvYdcUbSfd0IYXbQx9HfdPEc+r0ezg=
github.com/tikv/pd/client v0.0.0-20230728033905-31343e006842 h1:TwjBJvRx/DJbgMt7Vk5cFO7tG1DZnxR+22S2VmaNGRw=
github.com/tikv/pd/client v0.0.0-20230728033905-31343e006842/go.mod h1:VJwM+qMcQxvGgyu9C6wU7fhjLaALs+odsOvpUMmnhHo=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
Expand Down
3 changes: 1 addition & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4347,9 +4347,8 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo
if !s.isInternal() {
if txn, _ := s.Txn(false); txn != nil && txn.Valid() {
txn.SetOption(kv.RequestSourceType, stmtLabel)
} else {
s.sessionVars.RequestSourceType = stmtLabel
}
s.sessionVars.RequestSourceType = stmtLabel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tiny fix because coprocessor will read request source from the session variables.

return
}
if source := ctx.Value(kv.RequestSourceKey); source != nil {
Expand Down
6 changes: 5 additions & 1 deletion session/test/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,19 +967,23 @@ func TestRequestSource(t *testing.T) {
return interceptor.NewRPCInterceptor("kv-request-source-verify", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
requestSource := ""
readType := ""
switch r := req.Req.(type) {
case *kvrpcpb.PrewriteRequest:
requestSource = r.GetContext().GetRequestSource()
case *kvrpcpb.CommitRequest:
requestSource = r.GetContext().GetRequestSource()
case *coprocessor.Request:
readType = "-leader" // read request will be attached with read type
requestSource = r.GetContext().GetRequestSource()
case *kvrpcpb.GetRequest:
readType = "-leader" // read request will be attached with read type
requestSource = r.GetContext().GetRequestSource()
case *kvrpcpb.BatchGetRequest:
readType = "-leader" // read request will be attached with read type
requestSource = r.GetContext().GetRequestSource()
}
require.Equal(t, source, requestSource)
require.Equal(t, source+readType, requestSource)
return next(target, req)
}
})
Expand Down
25 changes: 19 additions & 6 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ type copTask struct {
busyThreshold time.Duration
meetLockFallback bool

// timeout value for one kv readonly reqeust
// timeout value for one kv readonly request
tidbKvReadTimeout uint64
// firstReadType is used to indicate the type of first read when retrying.
firstReadType string
}

type batchedCopTask struct {
Expand Down Expand Up @@ -1174,12 +1176,16 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
RecordTimeStat: true,
RecordScanStat: true,
TaskId: worker.req.TaskID,
RequestSource: task.requestSource.GetRequestSource(),
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: worker.req.ResourceGroupName,
},
BusyThresholdMs: uint32(task.busyThreshold.Milliseconds()),
})
req.InputRequestSource = task.requestSource.GetRequestSource()
if task.firstReadType != "" {
req.ReadType = task.firstReadType
req.IsRetryRequest = true
}
if worker.req.ResourceGroupTagger != nil {
worker.req.ResourceGroupTagger(req)
}
Expand Down Expand Up @@ -1255,12 +1261,19 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
tidbmetrics.DistSQLCoprRespBodySize.WithLabelValues(storeAddr).Observe(float64(len(copResp.Data)))
}

var remains []*copTask
if worker.req.Paging.Enable {
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, costTime)
remains, err = worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, costTime)
} else {
// Handles the response for non-paging copTask.
remains, err = worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, nil, costTime)
}

// Handles the response for non-paging copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, nil, costTime)
if req.ReadType != "" {
for _, remain := range remains {
remain.firstReadType = req.ReadType
}
}
return remains, err
}

const (
Expand Down