Skip to content

Commit

Permalink
client: add NewClientWithKeyspaceName for client (tikv#6380)
Browse files Browse the repository at this point in the history
ref tikv#5895

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 2, 2023
1 parent d826867 commit be36c04
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
45 changes: 43 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,6 @@ func NewClientWithContext(ctx context.Context, svrAddrs []string, security Secur

// NewClientWithKeyspace creates a client with context and the specified keyspace id.
func NewClientWithKeyspace(ctx context.Context, keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace", zap.Strings("pd-address", svrAddrs), zap.Uint32("keyspace-id", keyspaceID))

tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
Expand Down Expand Up @@ -351,6 +349,49 @@ func NewClientWithKeyspace(ctx context.Context, keyspaceID uint32, svrAddrs []st
return c, nil
}

// NewClientWithKeyspaceName creates a client with context and the specified keyspace name.
func NewClientWithKeyspaceName(ctx context.Context, keyspace string, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace", zap.Strings("pd-address", svrAddrs), zap.String("keyspace", keyspace))

tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
KeyPath: security.KeyPath,

SSLCABytes: security.SSLCABytes,
SSLCertBytes: security.SSLCertBytes,
SSLKEYBytes: security.SSLKEYBytes,
}

clientCtx, clientCancel := context.WithCancel(ctx)
c := &client{
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
svrUrls: addrsToUrls(svrAddrs),
tlsCfg: tlsCfg,
option: newOption(),
}

// Inject the client options.
for _, opt := range opts {
opt(c)
}

c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, c.setServiceMode, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
return nil, err
}
keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace)
// Here we ignore ENTRY_NOT_FOUND error and it will set the keyspaceID to 0.
if err != nil && !strings.Contains(err.Error(), "ENTRY_NOT_FOUND") {
return nil, err
}
c.keyspaceID = keyspaceMeta.GetId()
return c, nil
}

func (c *client) setup() error {
// Init the client base.
if err := c.pdSvcDiscovery.Init(); err != nil {
Expand Down
13 changes: 1 addition & 12 deletions server/keyspace_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ func (s *KeyspaceServer) LoadKeyspace(_ context.Context, request *keyspacepb.Loa
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}
rc := s.GetRaftCluster()
if rc == nil {
return &keyspacepb.LoadKeyspaceResponse{Header: s.notBootstrappedHeader()}, nil
}

manager := s.GetKeyspaceManager()
meta, err := manager.LoadKeyspace(request.GetName())
Expand All @@ -77,10 +73,6 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques
if err := s.validateRequest(request.GetHeader()); err != nil {
return err
}
rc := s.GetRaftCluster()
if rc == nil {
return stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.notBootstrappedHeader()})
}

ctx, cancel := context.WithCancel(s.Context())
defer cancel()
Expand Down Expand Up @@ -161,10 +153,7 @@ func (s *KeyspaceServer) UpdateKeyspaceState(_ context.Context, request *keyspac
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}
rc := s.GetRaftCluster()
if rc == nil {
return &keyspacepb.UpdateKeyspaceStateResponse{Header: s.notBootstrappedHeader()}, nil
}

manager := s.GetKeyspaceManager()
meta, err := manager.UpdateKeyspaceStateByID(request.GetId(), request.GetState(), time.Now().Unix())
if err != nil {
Expand Down
10 changes: 1 addition & 9 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
rm "github.com/tikv/pd/pkg/mcs/resource_manager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
)
Expand All @@ -51,14 +50,7 @@ func InitLogger(cfg *tso.Config) (err error) {

// SetupClientWithKeyspace creates a TSO client for test.
func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
cli, err := pd.NewClientWithKeyspace(ctx, utils.DefaultKeyspaceID, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// SetupClient creates a TSO client for test.
func SetupClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...)
cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
re.NoError(err)
leaderName := cluster.WaitLeader()
pdLeader := cluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()
re.NoError(pdLeader.BootstrapCluster())
backendEndpoints := pdLeader.GetAddr()
client := pdLeader.GetEtcdClient()
if isAPIServiceMode {
re.Equal(0, getEtcdTimestampKeyNum(re, client))
Expand Down

0 comments on commit be36c04

Please sign in to comment.