From 5c602d0b43d73f34ffa8d2bfa27d984c8e22f1e9 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 18 May 2023 18:13:01 +0800 Subject: [PATCH] Use a new way to create client for v2 (#806) Signed-off-by: iosmanthus --- rawkv/rawkv.go | 39 ++++++++++++++++++++------------------- tikv/kv.go | 8 +++++++- txnkv/client.go | 21 +++++++++++++++++---- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index ddae7289b..f851a3ed4 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -200,32 +200,33 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) o(opt) } - // Use an unwrapped PDClient to obtain keyspace meta. - pdCli, err := pd.NewClientWithContext(ctx, pdAddrs, pd.SecurityOption{ - CAPath: opt.security.ClusterSSLCA, - CertPath: opt.security.ClusterSSLCert, - KeyPath: opt.security.ClusterSSLKey, - }, opt.pdOptions...) - if err != nil { - return nil, errors.WithStack(err) - } - - // Build a CodecPDClient - var codecCli *tikv.CodecPDClient + var ( + pdClient pd.Client + codecCli *tikv.CodecPDClient + err error + ) switch opt.apiVersion { case kvrpcpb.APIVersion_V1, kvrpcpb.APIVersion_V1TTL: - codecCli = locate.NewCodecPDClient(tikv.ModeRaw, pdCli) + pdClient, err = tikv.NewPDClientWithAPIContext(pdAddrs, pd.NewAPIContextV1()) + if err != nil { + return nil, errors.WithStack(err) + } + codecCli = locate.NewCodecPDClient(tikv.ModeRaw, pdClient) case kvrpcpb.APIVersion_V2: - codecCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdCli, opt.keyspace) + pdClient, err = tikv.NewPDClientWithAPIContext(pdAddrs, pd.NewAPIContextV2(opt.keyspace)) + if err != nil { + return nil, errors.WithStack(err) + } + codecCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdClient, opt.keyspace) if err != nil { return nil, err } default: - return nil, errors.Errorf("unknown api version: %d", opt.apiVersion) + return nil, errors.Errorf("unknown API version: %d", opt.apiVersion) } - pdCli = codecCli + pdClient = codecCli rpcCli := client.NewRPCClient( client.WithSecurity(opt.security), @@ -235,9 +236,9 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) return &Client{ apiVersion: opt.apiVersion, - clusterID: pdCli.GetClusterID(ctx), - regionCache: locate.NewRegionCache(pdCli), - pdClient: pdCli, + clusterID: pdClient.GetClusterID(ctx), + regionCache: locate.NewRegionCache(pdClient), + pdClient: pdClient, rpcClient: rpcCli, }, nil } diff --git a/tikv/kv.go b/tikv/kv.go index a60cbb4c7..c2e7aee61 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -235,9 +235,15 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl // NewPDClient returns an unwrapped pd client. func NewPDClient(pdAddrs []string) (pd.Client, error) { + return NewPDClientWithAPIContext(pdAddrs, pd.NewAPIContextV1()) +} + +// NewPDClientWithAPIContext returns an unwrapped pd client with API context. +func NewPDClientWithAPIContext(pdAddrs []string, apiContext pd.APIContext) (pd.Client, error) { cfg := config.GetGlobalConfig() // init pd-client - pdCli, err := pd.NewClient( + pdCli, err := pd.NewClientWithAPIContext( + context.Background(), apiContext, pdAddrs, pd.SecurityOption{ CAPath: cfg.Security.ClusterSSLCA, CertPath: cfg.Security.ClusterSSLCert, diff --git a/txnkv/client.go b/txnkv/client.go index 655c32e18..5fc98c2c7 100644 --- a/txnkv/client.go +++ b/txnkv/client.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" "github.com/tikv/client-go/v2/util" + pd "github.com/tikv/pd/client" ) // Client is a txn client. @@ -70,12 +71,24 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) { for _, o := range opts { o(opt) } - // Use an unwrapped PDClient to obtain keyspace meta. - pdClient, err := tikv.NewPDClient(pdAddrs) + + var ( + pdClient pd.Client + apiContext pd.APIContext + err error + ) + switch opt.apiVersion { + case kvrpcpb.APIVersion_V1: + apiContext = pd.NewAPIContextV1() + case kvrpcpb.APIVersion_V2: + apiContext = pd.NewAPIContextV2(opt.keyspaceName) + default: + return nil, errors.Errorf("unknown API version: %d", opt.apiVersion) + } + pdClient, err = tikv.NewPDClientWithAPIContext(pdAddrs, apiContext) if err != nil { return nil, errors.WithStack(err) } - pdClient = util.InterceptedPDClient{Client: pdClient} // Construct codec from options. @@ -89,7 +102,7 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) { return nil, err } default: - return nil, errors.Errorf("unknown api version: %d", opt.apiVersion) + return nil, errors.Errorf("unknown API version: %d", opt.apiVersion) } pdClient = codecCli