Skip to content

Commit

Permalink
client-go: attach request source with retry info for coprocessor (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and you06 committed Sep 18, 2023
1 parent b9fd711 commit 778b007
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .bazelversion
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.3.2
6.1.1
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3703,8 +3703,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:u21KmROEJmJL+Wkqz4cUe2BpjmcobqsYVNGEbVWKmnU=",
version = "v2.0.4-0.20230829002742-dfae543556aa",
sum = "h1:19SpfhQbXO+rVqZbyWoU08CprQtUVIMhDMQ0R7zajAw=",
version = "v2.0.4-0.20230915073153-7f7f0549611a",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
3 changes: 1 addition & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4173,9 +4173,8 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo
if !s.isInternal() {
if txn, _ := s.Txn(false); txn != nil && txn.Valid() {
txn.SetOption(kv.RequestSourceType, stmtLabel)
} else {
s.sessionVars.RequestSourceType = stmtLabel
}
s.sessionVars.RequestSourceType = stmtLabel
return
}
if source := ctx.Value(kv.RequestSourceKey); source != nil {
Expand Down
27 changes: 21 additions & 6 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ type copTask struct {

// timeout value for one kv readonly request
tikvClientReadTimeout uint64
// timeout value for one kv readonly reqeust
tidbKvReadTimeout uint64
// firstReadType is used to indicate the type of first read when retrying.
firstReadType string
}

type batchedCopTask struct {
Expand Down Expand Up @@ -1022,13 +1026,17 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
RecordTimeStat: true,
RecordScanStat: true,
TaskId: worker.req.TaskID,
RequestSource: task.requestSource.GetRequestSource(),
})
req.InputRequestSource = task.requestSource.GetRequestSource()
if task.firstReadType != "" {
req.ReadType = task.firstReadType
req.IsRetryRequest = true
}
if worker.req.ResourceGroupTagger != nil {
worker.req.ResourceGroupTagger(req)
}
timeout := tikv.ReadTimeoutMedium
if task.tikvClientReadTimeout> 0 {
if task.tikvClientReadTimeout > 0 {
timeout = time.Duration(task.tikvClientReadTimeout) * time.Millisecond
}
req.StoreTp = getEndPointType(task.storeType)
Expand Down Expand Up @@ -1073,12 +1081,19 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
tidbmetrics.DistSQLCoprRespBodySize.WithLabelValues(storeAddr).Observe(float64(len(copResp.Data)))
}

var remains []*copTask
if worker.req.Paging.Enable {
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, costTime)
remains, err = worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, costTime)
} else {
// Handles the response for non-paging copTask.
remains, err = worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, nil, costTime)
}

// Handles the response for non-paging copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: copResp}, cacheKey, cacheValue, task, ch, nil, costTime)
if req.ReadType != "" {
for _, remain := range remains {
remain.firstReadType = req.ReadType
}
}
return remains, err
}

const (
Expand Down

0 comments on commit 778b007

Please sign in to comment.