Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] support GetRegion via CSE sync_region API #745

Open
wants to merge 59 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
97cea40
refactor fanout logic of cse region client
iosmanthus Mar 28, 2023
4da4647
add todos about region info resp from cse
iosmanthus Mar 28, 2023
7ab0132
support get store for CSEClient
iosmanthus Mar 28, 2023
de2f0f4
update comment for cse test
iosmanthus Mar 28, 2023
5bd7f71
remove logs
iosmanthus Mar 29, 2023
4054b80
Merge branch 'master' of github.com:tikv/client-go into cse-region-cl…
iosmanthus Mar 31, 2023
05b48d2
add store prober
iosmanthus Mar 31, 2023
e05be61
fix compile
iosmanthus Mar 31, 2023
a9ad5f5
remove logs
iosmanthus Mar 31, 2023
e3a2ba2
add cb for Fallback and CSEClient
iosmanthus Apr 4, 2023
70cc79e
goimports -w ./
iosmanthus Apr 4, 2023
f8201f7
introduce asyncBreaker for Fallback and CSEClient
iosmanthus Apr 6, 2023
e2fdd40
Merge branch 'master' of github.com:tikv/client-go into cse-region-cl…
iosmanthus Apr 6, 2023
ccbfa16
bump kvproto
iosmanthus Apr 6, 2023
bca479e
goimports -w ./
iosmanthus Apr 6, 2023
f6b9f97
fix lint
iosmanthus Apr 6, 2023
e8cf959
go mod tidy for integration tests
iosmanthus Apr 6, 2023
8b68569
introduce EnableCSERegionClient for TiKVClient
iosmanthus Apr 6, 2023
770e723
refine interface comments and options
iosmanthus Apr 7, 2023
01caebd
bump kvproto to master
iosmanthus Apr 7, 2023
055f177
Merge branch 'master' into cse-region-client
iosmanthus Apr 10, 2023
e1b0313
execdetails: add RRU/WRU interface of RURuntimeStats (#768)
nolouch Apr 19, 2023
6de096c
fix encoding of mvcc get by key (#775)
iosmanthus Apr 20, 2023
94baaf7
Merge branch 'master' of github.com:tikv/client-go into cse-region-cl…
iosmanthus Apr 20, 2023
4011b67
fix panic while collects buckets (#774)
iosmanthus Apr 20, 2023
f53c17d
add const label for metrics (#781)
zeminzhou Apr 26, 2023
fcf9595
force to sync the current region while GetPrevRegion (#783)
iosmanthus Apr 26, 2023
716c8a5
encode store batch tasks (#785)
iosmanthus Apr 27, 2023
4d2f943
cse: decode batch cop task (#798)
iosmanthus May 12, 2023
31152f5
*: Add GetMinTS (#801)
rleungx May 17, 2023
d5baf4c
Use a new way to create client for v2 (#806)
rleungx May 18, 2023
a2f93a3
Count only one tiflash replica in disaggregated mode. (#805)
JinheLin May 22, 2023
5f10ed1
cse: catch up master (#810)
iosmanthus May 22, 2023
35937f2
Merge branch 'master' of github.com:tikv/client-go into cse-region-cl…
iosmanthus May 22, 2023
d8ce633
Merge branch 'cse-region-client' of github.com:tikv/client-go into cs…
iosmanthus May 22, 2023
a6a7be2
Add get min ts implementation (#809)
rleungx May 23, 2023
1d62208
*: fix the lint and add CI (#813)
rleungx May 23, 2023
388baf3
keyspace gc cse client (#803)
ystaticy May 25, 2023
4499cc5
support remote coprocessor (#808)
coocood May 29, 2023
a07600f
cse: add replica number (#824)
nolouch May 31, 2023
390b11b
add logs for cse region client (#838)
iosmanthus Jun 15, 2023
d2d449f
filter out removed node state (#849)
iosmanthus Jun 21, 2023
a246fae
polish comments for cse region client (#850)
iosmanthus Jun 21, 2023
136f833
*: update pd client version (#840)
lhy1024 Jun 25, 2023
d5a7044
refactor cse region client as plugin (#853)
iosmanthus Jun 29, 2023
43fcadf
rebase master for cse-region-client (#854)
iosmanthus Jun 29, 2023
1d58034
fix conflicts with master for cse-region-client (#856)
iosmanthus Jun 29, 2023
f04046a
Merge branch 'master' of github.com:tikv/client-go into cse-region-cl…
iosmanthus Jun 29, 2023
19ef437
resource_control: bypass some internal urgent request (#884)
nolouch Jul 14, 2023
c22bb29
pdclient: get all keyspace (#892)
ystaticy Jul 17, 2023
b27cb07
resourcecontrol: fix nil pointer (#900)
nolouch Jul 21, 2023
2ad441f
prefix safepoint kv with keyspace name (#928)
AmoebaProtozoa Aug 7, 2023
b67add4
merge master into cse region client (#939)
iosmanthus Aug 11, 2023
e93b07c
Merge branch 'master' of github.com:tikv/client-go into cse-region-cl…
iosmanthus Aug 11, 2023
a29e95d
collecting the RU information by pasing point through context.Value (…
zeminzhou Nov 6, 2023
e1d4f84
increase large transaction preSplitSizeThreashold (#1059)
coocood Nov 15, 2023
0041484
merge master for cse region client (#1083)
iosmanthus Dec 20, 2023
4ffb53a
Merge branch 'master' of github.com:tikv/client-go into cse-region-cl…
iosmanthus Dec 20, 2023
aaa66ef
Merge branch 'master' into cse-region-client
iosmanthus Dec 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Compatibility Test

on:
push:
branches: [ master ]
branches: [ master, cse-region-client ]
pull_request:
branches: [ master ]
branches: [ master, cse-region-client ]

jobs:

Expand All @@ -26,6 +26,7 @@ jobs:
with:
repository: pingcap/tidb
path: tidb
ref: release-7.1

- name: Check build
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Integration Test

on:
push:
branches: [ master ]
branches: [ master, cse-region-client ]
pull_request:
branches: [ master ]
branches: [ master, cse-region-client ]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Unit Test

on:
push:
branches: [ master ]
branches: [ master, cse-region-client ]
pull_request:
branches: [ master ]
branches: [ master, cse-region-client ]

jobs:
test:
Expand Down
3 changes: 3 additions & 0 deletions config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type TiKVClient struct {
// TTLRefreshedTxnSize controls whether a transaction should update its TTL or not.
TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"`
ResolveLockLiteThreshold uint64 `toml:"resolve-lock-lite-threshold" json:"resolve-lock-lite-threshold"`

// RemoteCoprocessorAddr is the address of the remote coprocessor.
RemoteCoprocessorAddr string `toml:"remote-coprocessor-addr" json:"remote-coprocessor-addr"`
}

// AsyncCommit is the config for the async commit feature. The switch to enable it is a system variable.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/client_model v0.3.0
github.com/prometheus/client_model v0.4.0
github.com/stretchr/testify v1.8.2
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tikv/pd/client v0.0.0-20231219031951-25f48f0bdd27
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
Expand Down Expand Up @@ -86,8 +85,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
Expand All @@ -112,8 +111,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg=
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20231219031951-25f48f0bdd27 h1:U8jPVwFu9Zu8tXYlmOxO/Zv3OcsgoJ/COSwMNWvED9c=
github.com/tikv/pd/client v0.0.0-20231219031951-25f48f0bdd27/go.mod h1:AwjTSpM7CgAynYwB6qTG5R5fVC9/eXlQXiTO6zDL1HI=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
Expand Down
25 changes: 24 additions & 1 deletion internal/apicodec/codec_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error)
req.Req = &r
case tikvrpc.CmdMvccGetByKey:
r := *req.MvccGetByKey()
r.Key = c.EncodeRegionKey(r.Key)
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdSplitRegion:
r := *req.SplitRegion()
Expand Down Expand Up @@ -571,6 +571,11 @@ func (c *codecV2) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (
if err != nil {
return nil, err
}
// decodes batch responses
r.BatchResponses, err = c.decodeBatchTaskResps(r.BatchResponses)
if err != nil {
return nil, err
}
case tikvrpc.CmdCopStream:
return nil, errors.New("streaming coprocessor is not supported yet")
case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask:
Expand Down Expand Up @@ -1001,3 +1006,21 @@ func (c *codecV2) DecodeBucketKeys(keys [][]byte) ([][]byte, error) {
}
return ks, nil
}

func (c *codecV2) decodeBatchTaskResps(responses []*coprocessor.StoreBatchTaskResponse) ([]*coprocessor.StoreBatchTaskResponse, error) {
for _, r := range responses {
if r == nil {
continue
}
var err error
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Locked, err = c.decodeLockInfo(r.Locked)
if err != nil {
return nil, err
}
}
return responses, nil
}
40 changes: 32 additions & 8 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
Expand Down Expand Up @@ -1275,7 +1276,7 @@ func (s *RegionRequestSender) getRPCContext(
opts ...StoreSelectorOption,
) (*RPCContext, error) {
switch et {
case tikvrpc.TiKV:
case tikvrpc.TiKV, tikvrpc.TiKVRemoteCoprocessor:
if s.replicaSelector == nil {
selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...)
if selector == nil || err != nil {
Expand Down Expand Up @@ -1607,6 +1608,32 @@ func fetchRespInfo(resp *tikvrpc.Response) string {
return extraInfo
}

func (s *RegionRequestSender) countReplicaNumber(peers []*metapb.Peer) int {
isTiFlashWriteNode := func(storeId uint64) bool {
store := s.regionCache.getStoreByStoreID(storeId)
engine, _ := store.GetLabelValue("engine")
engineRole, _ := store.GetLabelValue("engine_role")
return engine == "tiflash" && engineRole == "write"
}
// In disaggregated-mode(tiflash write-node), only count 1 replica for tiflash, no matter how many tiflash write-nodes there are.
replicaNumber := 0
hasTiFlashWriteNode := false
for _, peer := range peers {
role := peer.GetRole()
if role == metapb.PeerRole_Voter {
replicaNumber++
} else if role == metapb.PeerRole_Learner {
if !isTiFlashWriteNode(peer.StoreId) {
replicaNumber++
} else if !hasTiFlashWriteNode {
hasTiFlashWriteNode = true
replicaNumber++
}
}
}
return replicaNumber
}

func (s *RegionRequestSender) sendReqToRegion(
bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration,
) (resp *tikvrpc.Response, retry bool, err error) {
Expand Down Expand Up @@ -1634,17 +1661,14 @@ func (s *RegionRequestSender) sendReqToRegion(
req.ForwardedHost = rpcCtx.Addr
sendToAddr = rpcCtx.ProxyAddr
}
if req.StoreTp == tikvrpc.TiKVRemoteCoprocessor {
sendToAddr = config.GetGlobalConfig().TiKVClient.RemoteCoprocessorAddr
}

// Count the replica number as the RU cost factor.
req.ReplicaNumber = 1
if rpcCtx.Meta != nil && len(rpcCtx.Meta.GetPeers()) > 0 {
req.ReplicaNumber = 0
for _, peer := range rpcCtx.Meta.GetPeers() {
role := peer.GetRole()
if role == metapb.PeerRole_Voter || role == metapb.PeerRole_Learner {
req.ReplicaNumber++
}
}
req.ReplicaNumber = int64(s.countReplicaNumber(rpcCtx.Meta.GetPeers()))
}

var sessionID uint64
Expand Down
1 change: 1 addition & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (s *mockTikvGrpcServer) EstablishMPPConnection(*mpp.EstablishMPPConnectionR
func (s *mockTikvGrpcServer) CancelMPPTask(context.Context, *mpp.CancelTaskRequest) (*mpp.CancelTaskResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) Raft(tikvpb.Tikv_RaftServer) error {
return errors.New("unreachable")
}
Expand Down
9 changes: 4 additions & 5 deletions internal/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ func (c *pdClient) GetTS(context.Context) (int64, int64, error) {
return tsMu.physicalTS, tsMu.logicalTS, nil
}

// GetMinTS returns the minimal ts.
func (c *pdClient) GetMinTS(ctx context.Context) (int64, int64, error) {
return 0, 0, nil
}

func (c *pdClient) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
panic("unimplemented")
}
Expand All @@ -174,6 +169,10 @@ func (c *pdClient) GetLocalTSAsync(ctx context.Context, dcLocation string) pd.TS
return c.GetTSAsync(ctx)
}

func (c *pdClient) GetMinTS(ctx context.Context) (int64, int64, error) {
return 0, 0, nil
}

func (c *pdClient) SetExternalTimestamp(ctx context.Context, newTimestamp uint64) error {
p, l, err := c.GetTS(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Oracle interface {

GetExternalTimestamp(ctx context.Context) (uint64, error)
SetExternalTimestamp(ctx context.Context, ts uint64) error

GetMinTimestamp(ctx context.Context) (uint64, error)
}

// Future is a future which promises to return a timestamp.
Expand Down
10 changes: 10 additions & 0 deletions oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ func (l *localOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint6
return ts, nil
}

func (l *localOracle) GetMinTimestamp(ctx context.Context) (uint64, error) {
l.Lock()
defer l.Unlock()
now := time.Now()
if l.hook != nil {
now = l.hook.currentTime
}
return oracle.GoTimeToTS(now), nil
}

func (l *localOracle) GetTimestampAsync(ctx context.Context, _ *oracle.Option) oracle.Future {
return &future{
ctx: ctx,
Expand Down
11 changes: 11 additions & 0 deletions oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ func (o *MockOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64
return ts, nil
}

// GetMinTimestamp implements oracle.Oracle interface.
func (o *MockOracle) GetMinTimestamp(ctx context.Context) (uint64, error) {
o.RLock()
defer o.RUnlock()

if o.stop {
return 0, errors.WithStack(errStopped)
}
return oracle.GoTimeToTS(time.Now().Add(o.offset)), nil
}

// GetStaleTimestamp implements oracle.Oracle interface.
func (o *MockOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
Expand Down
22 changes: 21 additions & 1 deletion oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type lastTSO struct {

// NewPdOracle create an Oracle that uses a pd client source.
// Refer https://github.com/tikv/pd/blob/master/client/client.go for more details.
// PdOracle mantains `lastTS` to store the last timestamp got from PD server. If
// PdOracle maintains `lastTS` to store the last timestamp got from PD server. If
// `GetTimestamp()` is not called after `updateInterval`, it will be called by
// itself to keep up with the timestamp on PD server.
func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracle, error) {
Expand Down Expand Up @@ -108,6 +108,11 @@ func (o *pdOracle) GetTimestamp(ctx context.Context, opt *oracle.Option) (uint64
return ts, nil
}

// GetMinTimestamp gets a minimum timestamp for all keyspace groups.
func (o *pdOracle) GetMinTimestamp(ctx context.Context) (uint64, error) {
return o.getMinTimestamp(ctx)
}

type tsFuture struct {
pd.TSFuture
o *pdOracle
Expand Down Expand Up @@ -159,6 +164,21 @@ func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, e
return oracle.ComposeTS(physical, logical), nil
}

func (o *pdOracle) getMinTimestamp(ctx context.Context) (uint64, error) {
now := time.Now()

physical, logical, err := o.c.GetMinTS(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
dist := time.Since(now)
if dist > slowDist {
logutil.Logger(ctx).Warn("get minimum timestamp too slow",
zap.Duration("cost time", dist))
}
return oracle.ComposeTS(physical, logical), nil
}

func (o *pdOracle) getArrivalTimestamp() uint64 {
return oracle.GoTimeToTS(time.Now())
}
Expand Down
39 changes: 20 additions & 19 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,32 +200,33 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt)
o(opt)
}

// Use an unwrapped PDClient to obtain keyspace meta.
pdCli, err := pd.NewClientWithContext(ctx, pdAddrs, pd.SecurityOption{
CAPath: opt.security.ClusterSSLCA,
CertPath: opt.security.ClusterSSLCert,
KeyPath: opt.security.ClusterSSLKey,
}, opt.pdOptions...)
if err != nil {
return nil, errors.WithStack(err)
}

// Build a CodecPDClient
var codecCli *tikv.CodecPDClient
var (
pdClient pd.Client
codecCli *tikv.CodecPDClient
err error
)

switch opt.apiVersion {
case kvrpcpb.APIVersion_V1, kvrpcpb.APIVersion_V1TTL:
codecCli = locate.NewCodecPDClient(tikv.ModeRaw, pdCli)
pdClient, err = tikv.NewPDClientWithAPIContext(pdAddrs, pd.NewAPIContextV1())
if err != nil {
return nil, errors.WithStack(err)
}
codecCli = locate.NewCodecPDClient(tikv.ModeRaw, pdClient)
case kvrpcpb.APIVersion_V2:
codecCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdCli, opt.keyspace)
pdClient, err = tikv.NewPDClientWithAPIContext(pdAddrs, pd.NewAPIContextV2(opt.keyspace))
if err != nil {
return nil, errors.WithStack(err)
}
codecCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdClient, opt.keyspace)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("unknown api version: %d", opt.apiVersion)
return nil, errors.Errorf("unknown API version: %d", opt.apiVersion)
}

pdCli = codecCli
pdClient = codecCli

rpcCli := client.NewRPCClient(
client.WithSecurity(opt.security),
Expand All @@ -235,9 +236,9 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt)

return &Client{
apiVersion: opt.apiVersion,
clusterID: pdCli.GetClusterID(ctx),
regionCache: locate.NewRegionCache(pdCli),
pdClient: pdCli,
clusterID: pdClient.GetClusterID(ctx),
regionCache: locate.NewRegionCache(pdClient),
pdClient: pdClient,
rpcClient: rpcCli,
}, nil
}
Expand Down
Loading
Loading