Skip to content

Commit

Permalink
tso: implement the global TSO generation algorithm (tikv#3033)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Oct 14, 2020
1 parent 8558f34 commit e0747f9
Show file tree
Hide file tree
Showing 10 changed files with 542 additions and 74 deletions.
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
var (
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
Expand Down
14 changes: 14 additions & 0 deletions pkg/tsoutil/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,17 @@ func GenerateTimestamp(physical time.Time, logical uint64) *pdpb.Timestamp {
Logical: int64(logical),
}
}

// CompareTimestamp is used to compare two timestamps.
// If tsoOne > tsoTwo, returns 1.
// If tsoOne = tsoTwo, returns 0.
// If tsoOne < tsoTwo, returns -1.
func CompareTimestamp(tsoOne, tsoTwo *pdpb.Timestamp) int {
if tsoOne.GetPhysical() > tsoTwo.GetPhysical() || (tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() > tsoTwo.GetLogical()) {
return 1
}
if tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() == tsoTwo.GetLogical() {
return 0
}
return -1
}
6 changes: 3 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
return nil, err
}
var processedDCs []string
if request.GetMaxTs() == nil || request.GetMaxTs().Physical == 0 {
if request.GetMaxTs() == nil || request.GetMaxTs().GetPhysical() == 0 {
// The first phase of synchronization: collect the max local ts
var maxLocalTS pdpb.Timestamp
for _, allocator := range allocatorLeaders {
Expand All @@ -952,8 +952,8 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
if err != nil {
return nil, err
}
if currentLocalTSO.Physical > maxLocalTS.Physical {
maxLocalTS.Physical = currentLocalTSO.Physical
if tsoutil.CompareTimestamp(&currentLocalTSO, &maxLocalTS) > 0 {
maxLocalTS = currentLocalTSO
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
}
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
s.GetTLSConfig())
if err = s.tsoAllocatorManager.SetLocalTSOConfig(s.cfg.LocalTSO); err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/grpcutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/election"
Expand Down Expand Up @@ -79,6 +80,7 @@ type AllocatorManager struct {
saveInterval time.Duration
updatePhysicalInterval time.Duration
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
}

// NewAllocatorManager creates a new TSO Allocator Manager.
Expand All @@ -88,6 +90,7 @@ func NewAllocatorManager(
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
maxResetTSGap func() time.Duration,
sc *grpcutil.TLSConfig,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
allocatorGroups: make(map[string]*allocatorGroup),
Expand All @@ -96,6 +99,7 @@ func NewAllocatorManager(
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
securityConfig: sc,
}
return allocatorManager
}
Expand Down Expand Up @@ -181,7 +185,7 @@ func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation

var allocator Allocator
if dcLocation == config.GlobalDCLocation {
allocator = NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.updatePhysicalInterval, am.maxResetTSGap)
allocator = NewGlobalTSOAllocator(am, leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.updatePhysicalInterval, am.maxResetTSGap)
} else {
allocator = NewLocalTSOAllocator(am.member, leadership, dcLocation, am.saveInterval, am.updatePhysicalInterval, am.maxResetTSGap)
}
Expand Down Expand Up @@ -552,11 +556,11 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) {
// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
defer am.RUnlock()
if len(dcLocation) == 0 {
dcLocation = config.GlobalDCLocation
}
allocatorGroup, exist := am.allocatorGroups[dcLocation]
am.RUnlock()
if !exist {
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
return pdpb.Timestamp{}, err
Expand Down Expand Up @@ -612,9 +616,8 @@ func (am *AllocatorManager) GetAllocators(filters ...AllocatorGroupFilter) []All
func (am *AllocatorManager) GetLocalAllocatorLeaders() ([]*LocalTSOAllocator, error) {
localAllocators := am.GetAllocators(
FilterDCLocation(config.GlobalDCLocation),
FilterUnavailableLeadership(),
FilterUninitialized())
localAllocatorLeaders := make([]*LocalTSOAllocator, len(localAllocators))
FilterUnavailableLeadership())
localAllocatorLeaders := make([]*LocalTSOAllocator, 0, len(localAllocators))
for _, localAllocator := range localAllocators {
localAllocatorLeader, ok := localAllocator.(*LocalTSOAllocator)
if !ok {
Expand Down
201 changes: 198 additions & 3 deletions server/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@
package tso

import (
"context"
"fmt"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/grpcutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/election"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// Allocator is a Timestamp Oracle allocator.
Expand Down Expand Up @@ -46,17 +57,25 @@ type GlobalTSOAllocator struct {
// to determine whether a TSO request could be processed.
leadership *election.Leadership
timestampOracle *timestampOracle
// for global TSO synchronization
allocatorManager *AllocatorManager
// for gRPC use
localAllocatorConn struct {
sync.RWMutex
clientConns map[string]*grpc.ClientConn
}
}

// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(
am *AllocatorManager,
leadership *election.Leadership,
rootPath string,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
maxResetTSGap func() time.Duration,
) Allocator {
return &GlobalTSOAllocator{
gta := &GlobalTSOAllocator{
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
Expand All @@ -65,7 +84,10 @@ func NewGlobalTSOAllocator(
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
},
allocatorManager: am,
}
gta.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)
return gta
}

// Initialize will initialize the created global TSO allocator.
Expand All @@ -91,8 +113,181 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
// Todo: implement the synchronization algorithm for global TSO generation
return gta.timestampOracle.getTS(gta.leadership, count)
// To check if we have any dc-location configured in the cluster
dcLocationMap, err := gta.allocatorManager.GetClusterDCLocations()
if err != nil {
log.Error("get cluster dc-locations info failed", errs.ZapError(err))
return pdpb.Timestamp{}, err
}
// No dc-locations configured in the cluster
if len(dcLocationMap) == 0 {
return gta.timestampOracle.getTS(gta.leadership, count)
}
// Send maxTS to all Local TSO Allocator leaders to prewrite
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxTSO := &pdpb.Timestamp{}
// Collect the MaxTS with all Local TSO Allocator leaders first
if err = gta.syncMaxTS(ctx, dcLocationMap, maxTSO); err != nil {
return pdpb.Timestamp{}, err
}
maxTSO.Logical += int64(count)
// Sync the MaxTS with all Local TSO Allocator leaders then
if err := gta.syncMaxTS(ctx, dcLocationMap, maxTSO); err != nil {
return pdpb.Timestamp{}, err
}
var currentGlobalTSO pdpb.Timestamp
if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil {
return pdpb.Timestamp{}, err
}
if tsoutil.CompareTimestamp(&currentGlobalTSO, maxTSO) < 0 {
// Update the global TSO in memory
if err := gta.SetTSO(tsoutil.GenerateTS(maxTSO)); err != nil {
return pdpb.Timestamp{}, err
}
}
return *maxTSO, nil
}

const (
dialTimeout = 3 * time.Second
rpcTimeout = 3 * time.Second
)

func (gta *GlobalTSOAllocator) syncMaxTS(ctx context.Context, dcLocationMap map[string][]uint64, maxTSO *pdpb.Timestamp) error {
maxRetryCount := 1
for i := 0; i < maxRetryCount; i++ {
// Collect all allocator leaders' client URLs
allocatorLeaders, err := gta.allocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return err
}
leaderURLs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
// Check if its client URLs are empty
if len(allocator.GetMember().GetClientUrls()) < 1 {
continue
}
leaderURL := allocator.GetMember().GetClientUrls()[0]
if slice.NoneOf(leaderURLs, func(i int) bool { return leaderURLs[i] == leaderURL }) {
leaderURLs = append(leaderURLs, leaderURL)
}
}
// Prepare to make RPC requests concurrently
respCh := make(chan *pdpb.SyncMaxTSResponse, len(leaderURLs))
errCh := make(chan error, len(leaderURLs))
var errList []error
wg := sync.WaitGroup{}
for _, leaderURL := range leaderURLs {
leaderConn, err := gta.getOrCreateGRPCConn(ctx, leaderURL)
if err != nil {
return err
}
wg.Add(1)
go func(ctx context.Context, conn *grpc.ClientConn, respCh chan<- *pdpb.SyncMaxTSResponse, errCh chan<- error) {
request := &pdpb.SyncMaxTSRequest{
Header: &pdpb.RequestHeader{
SenderId: gta.allocatorManager.member.ID(),
},
}
if maxTSO.GetPhysical() != 0 {
request.MaxTs = maxTSO
}
syncCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
resp, err := pdpb.NewPDClient(conn).SyncMaxTS(syncCtx, request)
cancel()
if err != nil {
errCh <- err
log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()), zap.Error(err))
}
respCh <- resp
if resp == nil {
log.Error("sync max ts rpc failed, got a nil response", zap.String("local-allocator-leader-url", leaderConn.Target()))
}
wg.Done()
}(ctx, leaderConn, respCh, errCh)
}
wg.Wait()
close(respCh)
close(errCh)
// If any error occurs, the synchronization process will fail
if err := <-errCh; err != nil {
errList = append(errList, err)
}
if len(errList) > 0 {
return errs.ErrSyncMaxTS.FastGenWithCause(errList)
}
var syncedDCs []string
for resp := range respCh {
if resp == nil {
return errs.ErrSyncMaxTS.FastGenWithCause("got nil response")
}
syncedDCs = append(syncedDCs, resp.GetDcs()...)
// Compare and get the max one
if resp.GetMaxLocalTs() != nil && resp.GetMaxLocalTs().GetPhysical() != 0 {
if tsoutil.CompareTimestamp(resp.GetMaxLocalTs(), maxTSO) > 0 {
*maxTSO = *(resp.GetMaxLocalTs())
}
}
}
if !gta.checkSyncedDCs(dcLocationMap, syncedDCs) {
// Only retry one time when synchronization is incomplete
if maxRetryCount == 1 {
log.Warn("unsynced dc-locations found, will retry", zap.Strings("syncedDCs", syncedDCs))
maxRetryCount++
continue
}
return errs.ErrSyncMaxTS.FastGenWithCause(fmt.Sprintf("unsynced dc-locations found, synced dc-locations: %+v", syncedDCs))
}
}
return nil
}

func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string][]uint64, syncedDCs []string) bool {
unsyncedDCs := make([]string, 0)
for dcLocation := range dcLocationMap {
if slice.NoneOf(syncedDCs, func(i int) bool { return syncedDCs[i] == dcLocation }) {
unsyncedDCs = append(unsyncedDCs, dcLocation)
}
}
log.Info("check unsynced dc-locations", zap.Strings("unsyncedDCs", unsyncedDCs), zap.Strings("syncedDCs", syncedDCs))
return len(unsyncedDCs) == 0
}

func (gta *GlobalTSOAllocator) getOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) {
gta.localAllocatorConn.RLock()
conn, ok := gta.localAllocatorConn.clientConns[addr]
gta.localAllocatorConn.RUnlock()
if ok {
return conn, nil
}
tlsCfg, err := gta.allocatorManager.securityConfig.ToTLSConfig()
if err != nil {
return nil, err
}
ctxWithTimeout, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := grpcutil.GetClientConn(ctxWithTimeout, addr, tlsCfg)
if err != nil {
return nil, err
}
gta.localAllocatorConn.Lock()
defer gta.localAllocatorConn.Unlock()
if old, ok := gta.localAllocatorConn.clientConns[addr]; ok {
cc.Close()
log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return old, nil
}
gta.localAllocatorConn.clientConns[addr] = cc
return cc, nil
}

func (gta *GlobalTSOAllocator) getCurrentTSO() (pdpb.Timestamp, error) {
currentPhysical, currentLogical := gta.timestampOracle.getTSO()
if currentPhysical == typeutil.ZeroTime {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
return *tsoutil.GenerateTimestamp(currentPhysical, uint64(currentLogical)), nil
}

// Reset is used to reset the TSO allocator.
Expand Down
5 changes: 2 additions & 3 deletions server/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ func (lta *LocalTSOAllocator) WriteTSO(maxTS *pdpb.Timestamp) error {
if err != nil {
return err
}
// If current local TSO has already been greater than
// maxTS, then do not update it.
if currentTSO.Physical >= maxTS.Physical {
// If current local TSO has already been greater or equal to maxTS, then do not update it.
if tsoutil.CompareTimestamp(&currentTSO, maxTS) >= 0 {
return nil
}
return lta.SetTSO(tsoutil.GenerateTS(maxTS))
Expand Down
Loading

0 comments on commit e0747f9

Please sign in to comment.