Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: implement the global TSO generation algorithm #3033

Merged
merged 25 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e13db13
Finish the sync of global TSO
JmPotato Sep 17, 2020
2a50dc0
Resolve the conflicts
JmPotato Sep 27, 2020
f964851
Remove unused error
JmPotato Sep 27, 2020
d6dc7e2
Update the algorithm
JmPotato Sep 27, 2020
1be0612
Merge branch 'master' of github.com:tikv/pd into global_local_tso_sync
JmPotato Sep 28, 2020
99f8d11
Update CompareTimestamp
JmPotato Sep 28, 2020
955218d
Merge branch 'master' of github.com:tikv/pd into global_local_tso_sync
JmPotato Sep 28, 2020
87e730e
Address comments
JmPotato Sep 28, 2020
b537068
Bug fix for a c.Assert
JmPotato Sep 28, 2020
700223e
Merge branch 'master' of github.com:tikv/pd into global_local_tso_sync
JmPotato Sep 28, 2020
049e395
Bug fix
JmPotato Sep 28, 2020
e4b6041
Merge branch 'master' of github.com:tikv/pd into global_local_tso_sync
JmPotato Sep 28, 2020
512d19d
Address comments
JmPotato Sep 28, 2020
fa0e5b6
Merge branch 'master' of github.com:tikv/pd into global_local_tso_sync
JmPotato Sep 29, 2020
3d90119
Update the TLSConfig
JmPotato Sep 29, 2020
6d8ca79
Merge branch 'master' into global_local_tso_sync
JmPotato Sep 30, 2020
3424180
Refine
JmPotato Sep 30, 2020
36be590
Add global TSO setting check
JmPotato Sep 30, 2020
3e1f159
Merge branch 'master' of github.com:tikv/pd into global_local_tso_sync
JmPotato Oct 9, 2020
b528c33
Resolve conflicts
JmPotato Oct 10, 2020
000fe33
Refine logs
JmPotato Oct 10, 2020
e49a0bc
Update the GetLocalAllocatorLeaders
JmPotato Oct 10, 2020
8301ad6
Merge branch 'master' of github.com:tikv/pd into global_local_tso_sync
JmPotato Oct 10, 2020
0c5f21a
Address comments
JmPotato Oct 10, 2020
9ede0ec
Merge branch 'master' into global_local_tso_sync
ti-srebot Oct 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
var (
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
Expand Down
14 changes: 14 additions & 0 deletions pkg/tsoutil/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,17 @@ func GenerateTimestamp(physical time.Time, logical uint64) *pdpb.Timestamp {
Logical: int64(logical),
}
}

// CompareTimestamp is used to compare two timestamps.
// If tsoOne > tsoTwo, returns 1.
// If tsoOne = tsoTwo, returns 0.
// If tsoOne < tsoTwo, returns -1.
func CompareTimestamp(tsoOne, tsoTwo *pdpb.Timestamp) int {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
if tsoOne.GetPhysical() > tsoTwo.GetPhysical() || (tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() > tsoTwo.GetLogical()) {
return 1
}
if tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() == tsoTwo.GetLogical() {
return 0
}
return -1
}
6 changes: 3 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
return nil, err
}
var processedDCs []string
if request.GetMaxTs() == nil || request.GetMaxTs().Physical == 0 {
if request.GetMaxTs() == nil || request.GetMaxTs().GetPhysical() == 0 {
// The first phase of synchronization: collect the max local ts
var maxLocalTS pdpb.Timestamp
for _, allocator := range allocatorLeaders {
Expand All @@ -952,8 +952,8 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
if err != nil {
return nil, err
}
if currentLocalTSO.Physical > maxLocalTS.Physical {
maxLocalTS.Physical = currentLocalTSO.Physical
if tsoutil.CompareTimestamp(&currentLocalTSO, &maxLocalTS) > 0 {
maxLocalTS = currentLocalTSO
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
}
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
s.GetTLSConfig())
if err = s.tsoAllocatorManager.SetLocalTSOConfig(s.cfg.LocalTSO); err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/grpcutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/election"
Expand Down Expand Up @@ -79,6 +80,7 @@ type AllocatorManager struct {
saveInterval time.Duration
updatePhysicalInterval time.Duration
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
}

// NewAllocatorManager creates a new TSO Allocator Manager.
Expand All @@ -88,6 +90,7 @@ func NewAllocatorManager(
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
maxResetTSGap func() time.Duration,
sc *grpcutil.TLSConfig,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
allocatorGroups: make(map[string]*allocatorGroup),
Expand All @@ -96,6 +99,7 @@ func NewAllocatorManager(
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
securityConfig: sc,
}
return allocatorManager
}
Expand Down Expand Up @@ -181,7 +185,7 @@ func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation

var allocator Allocator
if dcLocation == config.GlobalDCLocation {
allocator = NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.updatePhysicalInterval, am.maxResetTSGap)
allocator = NewGlobalTSOAllocator(am, leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.updatePhysicalInterval, am.maxResetTSGap)
} else {
allocator = NewLocalTSOAllocator(am.member, leadership, dcLocation, am.saveInterval, am.updatePhysicalInterval, am.maxResetTSGap)
}
Expand Down Expand Up @@ -552,11 +556,11 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) {
// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
defer am.RUnlock()
if len(dcLocation) == 0 {
dcLocation = config.GlobalDCLocation
}
allocatorGroup, exist := am.allocatorGroups[dcLocation]
am.RUnlock()
if !exist {
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
return pdpb.Timestamp{}, err
Expand Down Expand Up @@ -612,9 +616,8 @@ func (am *AllocatorManager) GetAllocators(filters ...AllocatorGroupFilter) []All
func (am *AllocatorManager) GetLocalAllocatorLeaders() ([]*LocalTSOAllocator, error) {
localAllocators := am.GetAllocators(
FilterDCLocation(config.GlobalDCLocation),
FilterUnavailableLeadership(),
FilterUninitialized())
localAllocatorLeaders := make([]*LocalTSOAllocator, len(localAllocators))
FilterUnavailableLeadership())
localAllocatorLeaders := make([]*LocalTSOAllocator, 0, len(localAllocators))
for _, localAllocator := range localAllocators {
localAllocatorLeader, ok := localAllocator.(*LocalTSOAllocator)
if !ok {
Expand Down
201 changes: 198 additions & 3 deletions server/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@
package tso

import (
"context"
"fmt"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/grpcutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/election"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// Allocator is a Timestamp Oracle allocator.
Expand Down Expand Up @@ -46,17 +57,25 @@ type GlobalTSOAllocator struct {
// to determine whether a TSO request could be processed.
leadership *election.Leadership
timestampOracle *timestampOracle
// for global TSO synchronization
allocatorManager *AllocatorManager
// for gRPC use
localAllocatorConn struct {
sync.RWMutex
clientConns map[string]*grpc.ClientConn
}
}

// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(
am *AllocatorManager,
leadership *election.Leadership,
rootPath string,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
maxResetTSGap func() time.Duration,
) Allocator {
return &GlobalTSOAllocator{
gta := &GlobalTSOAllocator{
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
Expand All @@ -65,7 +84,10 @@ func NewGlobalTSOAllocator(
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
},
allocatorManager: am,
}
gta.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)
return gta
}

// Initialize will initialize the created global TSO allocator.
Expand All @@ -91,8 +113,181 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
// Todo: implement the synchronization algorithm for global TSO generation
return gta.timestampOracle.getTS(gta.leadership, count)
// To check if we have any dc-location configured in the cluster
dcLocationMap, err := gta.allocatorManager.GetClusterDCLocations()
if err != nil {
log.Error("get cluster dc-locations info failed", errs.ZapError(err))
return pdpb.Timestamp{}, err
}
// No dc-locations configured in the cluster
if len(dcLocationMap) == 0 {
return gta.timestampOracle.getTS(gta.leadership, count)
}
// Send maxTS to all Local TSO Allocator leaders to prewrite
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxTSO := &pdpb.Timestamp{}
// Collect the MaxTS with all Local TSO Allocator leaders first
if err = gta.syncMaxTS(ctx, dcLocationMap, maxTSO); err != nil {
return pdpb.Timestamp{}, err
}
maxTSO.Logical += int64(count)
// Sync the MaxTS with all Local TSO Allocator leaders then
if err := gta.syncMaxTS(ctx, dcLocationMap, maxTSO); err != nil {
return pdpb.Timestamp{}, err
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}
var currentGlobalTSO pdpb.Timestamp
if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil {
return pdpb.Timestamp{}, err
}
if tsoutil.CompareTimestamp(&currentGlobalTSO, maxTSO) < 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud this update guarantee atomicity?

Copy link
Member Author

@JmPotato JmPotato Oct 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not atomic here. But even if the currentGlobalTSO changed during the comparison, the later SetTSO() will guarantee the atomicity and make sure the TSO won't fallback.

// Update the global TSO in memory
if err := gta.SetTSO(tsoutil.GenerateTS(maxTSO)); err != nil {
return pdpb.Timestamp{}, err
}
}
return *maxTSO, nil
}

const (
dialTimeout = 3 * time.Second
rpcTimeout = 3 * time.Second
)

func (gta *GlobalTSOAllocator) syncMaxTS(ctx context.Context, dcLocationMap map[string][]uint64, maxTSO *pdpb.Timestamp) error {
maxRetryCount := 1
for i := 0; i < maxRetryCount; i++ {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
// Collect all allocator leaders' client URLs
allocatorLeaders, err := gta.allocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return err
}
leaderURLs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
// Check if its client URLs are empty
if len(allocator.GetMember().GetClientUrls()) < 1 {
continue
}
leaderURL := allocator.GetMember().GetClientUrls()[0]
if slice.NoneOf(leaderURLs, func(i int) bool { return leaderURLs[i] == leaderURL }) {
leaderURLs = append(leaderURLs, leaderURL)
}
}
// Prepare to make RPC requests concurrently
respCh := make(chan *pdpb.SyncMaxTSResponse, len(leaderURLs))
errCh := make(chan error, len(leaderURLs))
var errList []error
wg := sync.WaitGroup{}
for _, leaderURL := range leaderURLs {
leaderConn, err := gta.getOrCreateGRPCConn(ctx, leaderURL)
if err != nil {
return err
}
wg.Add(1)
go func(ctx context.Context, conn *grpc.ClientConn, respCh chan<- *pdpb.SyncMaxTSResponse, errCh chan<- error) {
request := &pdpb.SyncMaxTSRequest{
Header: &pdpb.RequestHeader{
SenderId: gta.allocatorManager.member.ID(),
},
}
if maxTSO.GetPhysical() != 0 {
request.MaxTs = maxTSO
}
syncCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
resp, err := pdpb.NewPDClient(conn).SyncMaxTS(syncCtx, request)
cancel()
if err != nil {
errCh <- err
log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()), zap.Error(err))
}
respCh <- resp
if resp == nil {
log.Error("sync max ts rpc failed, got a nil response", zap.String("local-allocator-leader-url", leaderConn.Target()))
}
wg.Done()
}(ctx, leaderConn, respCh, errCh)
}
wg.Wait()
close(respCh)
close(errCh)
// If any error occurs, the synchronization process will fail
if err := <-errCh; err != nil {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
errList = append(errList, err)
}
if len(errList) > 0 {
return errs.ErrSyncMaxTS.FastGenWithCause(errList)
}
var syncedDCs []string
for resp := range respCh {
if resp == nil {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
return errs.ErrSyncMaxTS.FastGenWithCause("got nil response")
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
syncedDCs = append(syncedDCs, resp.GetDcs()...)
// Compare and get the max one
if resp.GetMaxLocalTs() != nil && resp.GetMaxLocalTs().GetPhysical() != 0 {
if tsoutil.CompareTimestamp(resp.GetMaxLocalTs(), maxTSO) > 0 {
*maxTSO = *(resp.GetMaxLocalTs())
}
}
}
if !gta.checkSyncedDCs(dcLocationMap, syncedDCs) {
// Only retry one time when synchronization is incomplete
if maxRetryCount == 1 {
log.Warn("unsynced dc-locations found, will retry", zap.Strings("syncedDCs", syncedDCs))
maxRetryCount++
continue
}
return errs.ErrSyncMaxTS.FastGenWithCause(fmt.Sprintf("unsynced dc-locations found, synced dc-locations: %+v", syncedDCs))
}
}
return nil
}

func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string][]uint64, syncedDCs []string) bool {
unsyncedDCs := make([]string, 0)
for dcLocation := range dcLocationMap {
if slice.NoneOf(syncedDCs, func(i int) bool { return syncedDCs[i] == dcLocation }) {
unsyncedDCs = append(unsyncedDCs, dcLocation)
}
}
log.Info("check unsynced dc-locations", zap.Strings("unsyncedDCs", unsyncedDCs), zap.Strings("syncedDCs", syncedDCs))
return len(unsyncedDCs) == 0
}

func (gta *GlobalTSOAllocator) getOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) {
gta.localAllocatorConn.RLock()
conn, ok := gta.localAllocatorConn.clientConns[addr]
gta.localAllocatorConn.RUnlock()
if ok {
return conn, nil
}
tlsCfg, err := gta.allocatorManager.securityConfig.ToTLSConfig()
if err != nil {
return nil, err
}
ctxWithTimeout, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := grpcutil.GetClientConn(ctxWithTimeout, addr, tlsCfg)
if err != nil {
return nil, err
}
gta.localAllocatorConn.Lock()
defer gta.localAllocatorConn.Unlock()
if old, ok := gta.localAllocatorConn.clientConns[addr]; ok {
cc.Close()
log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return old, nil
}
gta.localAllocatorConn.clientConns[addr] = cc
return cc, nil
}

func (gta *GlobalTSOAllocator) getCurrentTSO() (pdpb.Timestamp, error) {
currentPhysical, currentLogical := gta.timestampOracle.getTSO()
if currentPhysical == typeutil.ZeroTime {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
return *tsoutil.GenerateTimestamp(currentPhysical, uint64(currentLogical)), nil
}

// Reset is used to reset the TSO allocator.
Expand Down
5 changes: 2 additions & 3 deletions server/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ func (lta *LocalTSOAllocator) WriteTSO(maxTS *pdpb.Timestamp) error {
if err != nil {
return err
}
// If current local TSO has already been greater than
// maxTS, then do not update it.
if currentTSO.Physical >= maxTS.Physical {
// If current local TSO has already been greater or equal to maxTS, then do not update it.
if tsoutil.CompareTimestamp(&currentTSO, maxTS) >= 0 {
return nil
}
return lta.SetTSO(tsoutil.GenerateTS(maxTS))
Expand Down
Loading