Skip to content

Commit

Permalink
mcs, tso: implement GetMinTS gPRC on both API leader and PD client (t…
Browse files Browse the repository at this point in the history
…ikv#6488)

ref tikv#5895

implement GetMinTS gPRC on both API leader and the PD client.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent 6a530a1 commit d4a2405
Show file tree
Hide file tree
Showing 16 changed files with 243 additions and 155 deletions.
51 changes: 41 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -614,10 +615,16 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
zap.String("new-mode", newMode.String()))
}

func (c *client) getServiceClientProxy() (*tsoClient, pdpb.ServiceMode) {
func (c *client) getTSOClient() *tsoClient {
c.RLock()
defer c.RUnlock()
return c.tsoClient, c.serviceMode
return c.tsoClient
}

func (c *client) getServiceMode() pdpb.ServiceMode {
c.RLock()
defer c.RUnlock()
return c.serviceMode
}

func (c *client) scheduleUpdateTokenConnection() {
Expand Down Expand Up @@ -782,7 +789,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient, _ := c.getServiceClientProxy()
tsoClient := c.getTSOClient()
req.start = time.Now()
req.dcLocation = dcLocation

Expand Down Expand Up @@ -812,11 +819,8 @@ func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical in
}

func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
tsoClient, serviceMode := c.getServiceClientProxy()
if tsoClient == nil {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("tso client is nil")
}

// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
serviceMode := c.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
Expand All @@ -825,10 +829,37 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
// returning the default timeline should be fine.
return c.GetTS(ctx)
case pdpb.ServiceMode_API_SVC_MODE:
return tsoClient.getMinTS(ctx)
default:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}

// Call GetMinTS API to get the minimal TS from the API leader.
protoClient := c.getClient()
if protoClient == nil {
return 0, 0, errs.ErrClientGetProtoClient
}

resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{
Header: c.requestHeader(),
})
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we fallback to GetTS.
return c.GetTS(ctx)
}
return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause()
}
if resp == nil {
attachErr := errors.Errorf("error:%s", "no min ts info collected")
return 0, 0, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s s", resp.GetHeader().GetError().String())
return 0, 0, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}

minTS := resp.GetTimestamp()
return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
Expand Down Expand Up @@ -1522,7 +1553,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient, _ := c.getServiceClientProxy()
tsoClient := c.getTSOClient()
if tsoClient == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
116 changes: 0 additions & 116 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -281,116 +278,3 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) {
}
return nil, ""
}

// getMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from the TSO microservice.
func (c *tsoClient) getMinTS(ctx context.Context) (physical, logical int64, err error) {
// Immediately refresh the TSO server/pod list
addrs, err := c.svcDiscovery.DiscoverMicroservice(tsoService)
if err != nil {
return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause()
}
if len(addrs) == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("no tso servers/pods discovered")
}

// Get the minimal timestamp from the TSO servers/pods
var mutex sync.Mutex
resps := make([]*tsopb.GetMinTSResponse, 0)
wg := sync.WaitGroup{}
wg.Add(len(addrs))
for _, addr := range addrs {
go func(addr string) {
defer wg.Done()
resp, err := c.getMinTSFromSingleServer(ctx, addr, c.option.timeout)
if err != nil || resp == nil {
log.Warn("[tso] failed to get min ts from tso server",
zap.String("address", addr), zap.Error(err))
return
}
mutex.Lock()
defer mutex.Unlock()
resps = append(resps, resp)
}(addr)
}
wg.Wait()

// Check the results. The returned minimal timestamp is valid if all the conditions are met:
// 1. The number of responses is equal to the number of TSO servers/pods.
// 2. The number of keyspace groups asked is equal to the number of TSO servers/pods.
// 3. The minimal timestamp is not zero.
var (
minTS *pdpb.Timestamp
keyspaceGroupsAsked uint32
)
if len(resps) == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("none of tso server/pod responded")
}
emptyTS := &pdpb.Timestamp{}
keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal
for _, resp := range resps {
if resp.KeyspaceGroupsTotal == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service has no keyspace group")
}
if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
"the tso service has inconsistent keyspace group total count")
}
keyspaceGroupsAsked += resp.KeyspaceGroupsServing
if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 &&
(minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) {
minTS = resp.Timestamp
}
}

if keyspaceGroupsAsked != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d",
keyspaceGroupsAsked, keyspaceGroupsTotal))
}

if minTS == nil {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service is not ready")
}

return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
}

func (c *tsoClient) getMinTSFromSingleServer(
ctx context.Context, tsoSrvAddr string, timeout time.Duration,
) (*tsopb.GetMinTSResponse, error) {
cc, err := c.svcDiscovery.GetOrCreateGRPCConn(tsoSrvAddr)
if err != nil {
return nil, errs.ErrClientGetMinTSO.FastGenByArgs(
fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr))
}

cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

resp, err := tsopb.NewTSOClient(cc).GetMinTS(
cctx, &tsopb.GetMinTSRequest{
Header: &tsopb.RequestHeader{
ClusterId: c.svcDiscovery.GetClusterID(),
KeyspaceId: c.svcDiscovery.GetKeyspaceID(),
KeyspaceGroupId: c.svcDiscovery.GetKeyspaceGroupID(),
},
DcLocation: globalDCLocation,
})
if err != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
err, cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp == nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
"no min ts info collected", cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}

return resp, nil
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ error = '''
get member failed
'''

["PD:client:ErrClientGetMinTSO"]
error = '''
get min TSO failed, %v
'''

["PD:client:ErrClientGetTSO"]
error = '''
get TSO failed, %v
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var (
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
)

// schedule errors
Expand Down
Loading

0 comments on commit d4a2405

Please sign in to comment.