Skip to content

Commit

Permalink
Provide GetMinTS API to solve the compatibility issue brought by mult…
Browse files Browse the repository at this point in the history
…i-timeline tso (tikv#6421)

ref tikv#6142

1. Import kvproto change to introduce GetMinTS rpc in the TSO service.
6. Add server side implementation for GetMinTS rpc.
7. Add client side implementation for GetMinTS rpc.
8. Add unit test

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent 9d47a9f commit 80ea8e9
Show file tree
Hide file tree
Showing 26 changed files with 402 additions and 72 deletions.
53 changes: 36 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,20 @@ type serviceModeKeeper struct {
// triggering service mode switching concurrently.
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient atomic.Value // *tsoClient
tsoClient *tsoClient
tsoSvcDiscovery ServiceDiscovery
}

func (smk *serviceModeKeeper) close() {
smk.Lock()
defer smk.Unlock()
switch smk.serviceMode {
func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
smk.tsoSvcDiscovery.Close()
k.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if tsoCli := smk.tsoClient.Load(); tsoCli != nil {
tsoCli.(*tsoClient).Close()
if k.tsoClient != nil {
k.tsoClient.Close()
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
Expand Down Expand Up @@ -486,8 +486,8 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
}
newTSOCli.Setup()
// Replace the old TSO client.
oldTSOClient := c.getTSOClient()
c.tsoClient.Store(newTSOCli)
oldTSOClient := c.tsoClient
c.tsoClient = newTSOCli
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
Expand All @@ -506,11 +506,10 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
if tsoCli := c.tsoClient.Load(); tsoCli != nil {
return tsoCli.(*tsoClient)
}
return nil
func (c *client) getServiceClientProxy() (*tsoClient, pdpb.ServiceMode) {
c.RLock()
defer c.RUnlock()
return c.tsoClient, c.serviceMode
}

func (c *client) scheduleUpdateTokenConnection() {
Expand Down Expand Up @@ -675,7 +674,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
tsoClient, _ := c.getServiceClientProxy()
req.start = time.Now()
req.dcLocation = dcLocation

Expand Down Expand Up @@ -704,6 +703,26 @@ func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical in
return resp.Wait()
}

func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
tsoClient, serviceMode := c.getServiceClientProxy()
if tsoClient == nil {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("tso client is nil")
}

switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
case pdpb.ServiceMode_PD_SVC_MODE:
// If the service mode is switched to API during GetTS() call, which happens during migration,
// returning the default timeline should be fine.
return c.GetTS(ctx)
case pdpb.ServiceMode_API_SVC_MODE:
return tsoClient.getMinTS(ctx)
default:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
if res.Region == nil {
return nil
Expand Down Expand Up @@ -1395,7 +1414,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient := c.getTSOClient()
tsoClient, _ := c.getServiceClientProxy()
if tsoClient == nil {
return nil
}
Expand Down
13 changes: 7 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/testutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/goleak"
"google.golang.org/grpc"
)
Expand All @@ -32,13 +33,13 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

func TestTsLessEqual(t *testing.T) {
func TestTSLessEqual(t *testing.T) {
re := require.New(t)
re.True(tsLessEqual(9, 9, 9, 9))
re.True(tsLessEqual(8, 9, 9, 8))
re.False(tsLessEqual(9, 8, 8, 9))
re.False(tsLessEqual(9, 8, 9, 6))
re.True(tsLessEqual(9, 6, 9, 8))
re.True(tsoutil.TSLessEqual(9, 9, 9, 9))
re.True(tsoutil.TSLessEqual(8, 9, 9, 8))
re.False(tsoutil.TSLessEqual(9, 8, 8, 9))
re.False(tsoutil.TSLessEqual(9, 8, 9, 6))
re.True(tsoutil.TSLessEqual(9, 6, 9, 8))
}

func TestUpdateURLs(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
127 changes: 123 additions & 4 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,29 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// TSOClient is the client used to get timestamps.
type TSOClient interface {
// GetTS gets a timestamp from PD.
// GetTS gets a timestamp from PD or TSO microservice.
GetTS(ctx context.Context) (int64, int64, error)
// GetTSAsync gets a timestamp from PD, without block the caller.
// GetTSAsync gets a timestamp from PD or TSO microservice, without block the caller.
GetTSAsync(ctx context.Context) TSFuture
// GetLocalTS gets a local timestamp from PD.
// GetLocalTS gets a local timestamp from PD or TSO microservice.
GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error)
// GetLocalTSAsync gets a local timestamp from PD, without block the caller.
// GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller.
GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture
// GetMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from
// the TSO microservice.
GetMinTS(ctx context.Context) (int64, int64, error)
}

type tsoRequest struct {
Expand Down Expand Up @@ -275,3 +281,116 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) {
}
return nil, ""
}

// getMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from the TSO microservice.
func (c *tsoClient) getMinTS(ctx context.Context) (physical, logical int64, err error) {
// Immediately refresh the TSO server/pod list
addrs, err := c.svcDiscovery.DiscoverMicroservice(tsoService)
if err != nil {
return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause()
}
if len(addrs) == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("no tso servers/pods discovered")
}

// Get the minimal timestamp from the TSO servers/pods
var mutex sync.Mutex
resps := make([]*tsopb.GetMinTSResponse, 0)
wg := sync.WaitGroup{}
wg.Add(len(addrs))
for _, addr := range addrs {
go func(addr string) {
defer wg.Done()
resp, err := c.getMinTSFromSingleServer(ctx, addr, c.option.timeout)
if err != nil || resp == nil {
log.Warn("[tso] failed to get min ts from tso server",
zap.String("address", addr), zap.Error(err))
return
}
mutex.Lock()
defer mutex.Unlock()
resps = append(resps, resp)
}(addr)
}
wg.Wait()

// Check the results. The returned minimal timestamp is valid if all the conditions are met:
// 1. The number of responses is equal to the number of TSO servers/pods.
// 2. The number of keyspace groups asked is equal to the number of TSO servers/pods.
// 3. The minimal timestamp is not zero.
var (
minTS *pdpb.Timestamp
keyspaceGroupsAsked uint32
)
if len(resps) == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("none of tso server/pod responded")
}
emptyTS := &pdpb.Timestamp{}
keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal
for _, resp := range resps {
if resp.KeyspaceGroupsTotal == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service has no keyspace group")
}
if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
"the tso service has inconsistent keyspace group total count")
}
keyspaceGroupsAsked += resp.KeyspaceGroupsServing
if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 &&
(minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) {
minTS = resp.Timestamp
}
}

if keyspaceGroupsAsked != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d",
keyspaceGroupsAsked, keyspaceGroupsTotal))
}

if minTS == nil {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service is not ready")
}

return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
}

func (c *tsoClient) getMinTSFromSingleServer(
ctx context.Context, tsoSrvAddr string, timeout time.Duration,
) (*tsopb.GetMinTSResponse, error) {
cc, err := c.svcDiscovery.GetOrCreateGRPCConn(tsoSrvAddr)
if err != nil {
return nil, errs.ErrClientGetMinTSO.FastGenByArgs(
fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr))
}

cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

resp, err := tsopb.NewTSOClient(cc).GetMinTS(
cctx, &tsopb.GetMinTSRequest{
Header: &tsopb.RequestHeader{
ClusterId: c.svcDiscovery.GetClusterID(),
KeyspaceId: c.svcDiscovery.GetKeyspaceID(),
KeyspaceGroupId: c.svcDiscovery.GetKeyspaceGroupID(),
},
DcLocation: globalDCLocation,
})
if err != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
err, cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp == nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
"no min ts info collected", cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}

return resp, nil
}
21 changes: 5 additions & 16 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -715,19 +716,14 @@ func (c *tsoClient) processRequests(
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := addLogical(logical, -count+1, suffixBits)
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

// Because of the suffix, we need to shift the count before we add it to the logical part.
func addLogical(logical, count int64, suffixBits uint32) int64 {
return logical + count<<suffixBits
}

func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) {
largestLogical := addLogical(firstLogical, count-1, suffixBits)
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
physical: physical,
// Save the largest logical part here
Expand All @@ -742,7 +738,7 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i
// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf("%s timestamp fallback, newly acquired ts (%d, %d) is less or equal to last one (%d, %d)",
dcLocation, physical, firstLogical, lastPhysical, lastLogical))
}
Expand All @@ -751,19 +747,12 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i
lastTSOPointer.logical = largestLogical
}

func tsLessEqual(physical, logical, thatPhysical, thatLogical int64) bool {
if physical == thatPhysical {
return logical <= thatLogical
}
return physical < thatPhysical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
if span := opentracing.SpanFromContext(requests[i].requestCtx); span != nil {
span.Finish()
}
requests[i].physical, requests[i].logical = physical, addLogical(firstLogical, int64(i), suffixBits)
requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
requests[i].done <- err
}
}
8 changes: 5 additions & 3 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ type tsoServerDiscovery struct {
failureCount int
}

func (t *tsoServerDiscovery) countFailure() {
func (t *tsoServerDiscovery) countFailure() bool {
t.Lock()
defer t.Unlock()
t.failureCount++
return t.failureCount >= len(t.addrs)
}

func (t *tsoServerDiscovery) resetFailure() {
Expand Down Expand Up @@ -414,8 +415,9 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
c.tsoServerDiscovery.countFailure()
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
}
return err
}
c.tsoServerDiscovery.resetFailure()
Expand Down
Loading

0 comments on commit 80ea8e9

Please sign in to comment.