Skip to content

Commit

Permalink
Add and initialize cluster id in the TSO service (#6138)
Browse files Browse the repository at this point in the history
ref #5836

Add and initialize cluster id in the TSO service.
PD/API service discovery gets cluster id from the API service.
It then passes the cluster id to the TSO service because both share the same cluster id.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored Mar 10, 2023
1 parent f850aab commit 2296562
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 23 deletions.
30 changes: 19 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func NewClient(svrAddrs []string, security SecurityOption, opts ...ClientOption)
// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", svrAddrs))
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security)
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, 0, &security)
// Inject the client options.
for _, opt := range opts {
opt(c)
Expand All @@ -287,6 +287,10 @@ func NewClientWithContext(ctx context.Context, svrAddrs []string, security Secur
c.cancel()
return nil, err
}
if err := c.tsoClient.setup(); err != nil {
c.cancel()
return nil, err
}
return c, nil
}

Expand All @@ -296,24 +300,28 @@ func NewClientWithContext(ctx context.Context, svrAddrs []string, security Secur
// Before that, internal tools call this function to use mcs service.
func NewTSOClientWithContext(ctx context.Context, keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[tso] create tso client with endpoints", zap.Strings("pd(api)-address", svrAddrs))
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security)
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, keyspaceID, &security)
// Inject the client options.
for _, opt := range opts {
opt(c)
}

c.keyspaceID = keyspaceID
c.svcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option)
tsoSvcDiscovery := newTSOServiceDiscovery(clientCtx, clientCancel, &c.wg, MetaStorageClient(c), keyspaceID, addrsToUrls(svrAddrs), tlsCfg, c.option)
c.tsoClient = newTSOClient(clientCtx, clientCancel, &c.wg, c.option, c.keyspaceID, tsoSvcDiscovery, tsoSvcDiscovery.(tsoAllocatorEventSource), &tsoTSOStreamBuilderFactory{})
if err := c.setup(); err != nil {
c.cancel()
return nil, err
}

tsoSvcDiscovery := newTSOServiceDiscovery(clientCtx, clientCancel, &c.wg, MetaStorageClient(c), c.GetClusterID(c.ctx), keyspaceID, addrsToUrls(svrAddrs), tlsCfg, c.option)
c.tsoClient = newTSOClient(clientCtx, clientCancel, &c.wg, c.option, c.keyspaceID, tsoSvcDiscovery, tsoSvcDiscovery.(tsoAllocatorEventSource), &tsoTSOStreamBuilderFactory{})
if err := c.tsoClient.setup(); err != nil {
c.cancel()
return nil, err
}
return c, nil
}

func createClient(ctx context.Context, security *SecurityOption) (*client, context.Context, context.CancelFunc, *tlsutil.TLSConfig) {
func createClient(ctx context.Context, keyspaceID uint32, security *SecurityOption) (*client, context.Context, context.CancelFunc, *tlsutil.TLSConfig) {
tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
Expand All @@ -329,6 +337,7 @@ func createClient(ctx context.Context, security *SecurityOption) (*client, conte
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
keyspaceID: keyspaceID,
option: newOption(),
}

Expand All @@ -350,8 +359,7 @@ func (c *client) setup() error {
// Start the daemons.
c.wg.Add(1)
go c.leaderCheckLoop()

return c.tsoClient.setup()
return nil
}

func (c *client) Close() {
Expand All @@ -376,8 +384,8 @@ func (c *client) scheduleUpdateTokenConnection() {
}

// GetClusterID returns the ClusterID.
func (c *client) GetClusterID(ctx context.Context) uint64 {
return c.svcDiscovery.GetClusterID(ctx)
func (c *client) GetClusterID(context.Context) uint64 {
return c.svcDiscovery.GetClusterID()
}

// GetLeaderAddr returns the leader address.
Expand Down Expand Up @@ -971,7 +979,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.svcDiscovery.GetClusterID(c.ctx),
ClusterId: c.svcDiscovery.GetClusterID(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type ServiceDiscovery interface {
// Close releases all resources
Close()
// GetClusterID returns the ID of the cluster
GetClusterID(context.Context) uint64
GetClusterID() uint64
// GetURLs returns the URLs of the servers.
GetURLs() []string
// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint
Expand Down Expand Up @@ -222,7 +222,7 @@ func (c *pdServiceDiscovery) Close() {
}

// GetClusterID returns the ClusterID.
func (c *pdServiceDiscovery) GetClusterID(context.Context) uint64 {
func (c *pdServiceDiscovery) GetClusterID() uint64 {
return c.clusterID
}

Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (c *tsoClient) processTSORequests(stream tsoStream, dcLocation string, tbc

requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(c.svcDiscovery.GetClusterID(c.ctx), dcLocation, requests, tbc.batchStartTime)
physical, logical, suffixBits, err := stream.processRequests(c.svcDiscovery.GetClusterID(), dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishTSORequest(requests, 0, 0, 0, err)
return err
Expand Down
8 changes: 5 additions & 3 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var _ tsoAllocatorEventSource = (*tsoServiceDiscovery)(nil)

// tsoServiceDiscovery is the service discovery client of the independent TSO service
type tsoServiceDiscovery struct {
clusterID uint64
keyspaceID uint32
urls atomic.Value // Store as []string
// primary key is the etcd path used for discoverying the serving endpoint of this keyspace
Expand Down Expand Up @@ -83,13 +84,14 @@ type tsoServiceDiscovery struct {

// newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service.
func newTSOServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, metacli MetaStorageClient,
keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) ServiceDiscovery {
clusterID uint64, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) ServiceDiscovery {
bc := &tsoServiceDiscovery{
ctx: ctx,
cancel: cancel,
wg: wg,
metacli: metacli,
keyspaceID: keyspaceID,
clusterID: clusterID,
primaryKey: path.Join(tsoPrimaryPrefix, fmt.Sprintf("%05d", 0), "primary"),
tlsCfg: tlsCfg,
option: option,
Expand Down Expand Up @@ -158,8 +160,8 @@ func (c *tsoServiceDiscovery) Close() {
}

// GetClusterID returns the ID of the cluster
func (c *tsoServiceDiscovery) GetClusterID(context.Context) uint64 {
return 0
func (c *tsoServiceDiscovery) GetClusterID() uint64 {
return c.clusterID
}

// GetURLs returns the URLs of the servers.
Expand Down
9 changes: 3 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,9 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
}

func (s *Server) startServer() (err error) {
// TODO: uncomment the following code to generate a unique cluster id from the given ClusterIDPath
// after we add rpc for the client to retrieve the cluster id from the server then use it in every
// request for verification.
// if s.clusterID, err = etcdutil.GetClusterID(s.etcdClient, utils.ClusterIDPath); err != nil {
// return err
// }
if s.clusterID, err = etcdutil.GetClusterID(s.etcdClient, utils.ClusterIDPath); err != nil {
return err
}
log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID))

// It may lose accuracy if use float64 to store uint64. So we store the cluster id in label.
Expand Down
8 changes: 8 additions & 0 deletions tests/mcs/tso/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/testutil"
tsosvr "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -79,3 +80,10 @@ func startSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe

return s, cleanup, err
}

func setupCli(re *require.Assertions, ctx context.Context, endpoints []string, opts ...pd.ClientOption) pd.Client {
// TODO: we use keyspace 0 as the default keyspace for now, which mightn't need change in the future
cli, err := pd.NewTSOClientWithContext(ctx, 0, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}
102 changes: 102 additions & 0 deletions tests/mcs/tso/tso_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/suite"
tsosvr "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/tests"
)

const (
tsoRequestConcurrencyNumber = 1
tsoRequestRound = 30
)

type tsoServiceTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
pdLeader *tests.TestServer
backendEndpoints string
tsoSvr1 *tsosvr.Server
tsoSvrCleanup1 CleanupFunc
}

func TestTSOServiceTestSuite(t *testing.T) {
suite.Run(t, new(tsoServiceTestSuite))
}

func (suite *tsoServiceTestSuite) SetupSuite() {
var err error
re := suite.Require()

suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestCluster(suite.ctx, 1)
re.NoError(err)

err = suite.cluster.RunInitialServers()
re.NoError(err)

leaderName := suite.cluster.WaitLeader()
suite.pdLeader = suite.cluster.GetServer(leaderName)
suite.backendEndpoints = suite.pdLeader.GetAddr()

suite.tsoSvr1, suite.tsoSvrCleanup1, err = startSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints)
re.NoError(err)
}

func (suite *tsoServiceTestSuite) TearDownSuite() {
suite.tsoSvrCleanup1()
suite.cluster.Destroy()
suite.cancel()
}

func (suite *tsoServiceTestSuite) TestTSOServerRegister() {
re := suite.Require()

endpoints := strings.Split(suite.backendEndpoints, ",")
cli1 := setupCli(re, suite.ctx, endpoints)
cli2 := setupCli(re, suite.ctx, endpoints)

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
physical, logical, err := cli1.GetTS(context.Background())
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
physical, logical, err = cli2.GetTS(context.Background())
re.NoError(err)
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
}

0 comments on commit 2296562

Please sign in to comment.