Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: add SyncMaxTS RPC method #2988

Merged
merged 22 commits into from
Sep 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0bfc627
Add PrewriteMaxTS and WriteMaxTS RPC methods
JmPotato Sep 18, 2020
6a4c7e2
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 18, 2020
e82150b
Validate the sender ID with leader ID
JmPotato Sep 18, 2020
35f06dd
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 18, 2020
6126ebb
Handle the conflict
JmPotato Sep 18, 2020
bfddb94
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 21, 2020
792521e
Add a log
JmPotato Sep 21, 2020
9dee186
Update RPC verification process
JmPotato Sep 21, 2020
56c2c5c
Set tso by using uint64 ts
JmPotato Sep 21, 2020
8d82103
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 23, 2020
1fa290b
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 23, 2020
1f1b404
Remove the prewritten TSO in memory
JmPotato Sep 23, 2020
d406988
Compact the RPC methods
JmPotato Sep 23, 2020
52463fb
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 24, 2020
622c66e
Add the leadership checking
JmPotato Sep 24, 2020
674ba8a
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 25, 2020
138c2d4
Add the synced dcs checking
JmPotato Sep 25, 2020
fd82d24
Rename the field
JmPotato Sep 25, 2020
e40f53c
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 25, 2020
8fe9232
Address comments
JmPotato Sep 25, 2020
18de392
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 27, 2020
0b0b1e1
Update kvproto and resolve the conflict
JmPotato Sep 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d
github.com/pingcap/kvproto v0.0.0-20200916031750-f9473f2c5379
github.com/pingcap/kvproto v0.0.0-20200927025644-73dc27044686
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/sysutil v0.0.0-20200715082929-4c47bcac246a
github.com/prometheus/client_golang v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200916031750-f9473f2c5379 h1:KAGE4PYxYLL/dnui3sRCcQHNpcpP5aMl0R/NKzATGgI=
github.com/pingcap/kvproto v0.0.0-20200916031750-f9473f2c5379/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200927025644-73dc27044686 h1:cf7TL5LMMPQew7vPOtvcfam6AyYxwu5uzcOrHMN8z7k=
github.com/pingcap/kvproto v0.0.0-20200927025644-73dc27044686/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
// tso errors
var (
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
Expand Down
18 changes: 15 additions & 3 deletions pkg/tsoutil/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,20 @@ func ParseTS(ts uint64) (time.Time, uint64) {

// ParseTimestamp parses `pdpb.Timestamp` to `time.Time`
func ParseTimestamp(ts pdpb.Timestamp) (time.Time, uint64) {
logical := uint64(ts.Logical)
physical := ts.Physical
physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds())
logical := uint64(ts.GetLogical())
physicalTime := time.Unix(ts.GetPhysical()/1000, ts.GetPhysical()%1000*time.Millisecond.Nanoseconds())
return physicalTime, logical
}

// GenerateTS generate an `uint64` TS by passing a `pdpb.Timestamp`.
func GenerateTS(ts *pdpb.Timestamp) uint64 {
return uint64(ts.GetPhysical())<<18 | uint64(ts.GetLogical())&0x3FFFF
}

// GenerateTimestamp generate a `pdpb.Timestamp` by passing `time.Time` and `uint64`
func GenerateTimestamp(physical time.Time, logical uint64) *pdpb.Timestamp {
return &pdpb.Timestamp{
Physical: physical.UnixNano() / int64(time.Millisecond),
Logical: int64(logical),
}
}
74 changes: 73 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleTSORequest(config.GlobalDCLocation, count)
ts, err := s.tsoAllocatorManager.HandleTSORequest(request.GetDcLocation(), count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -908,3 +908,75 @@ func (s *Server) incompatibleVersion(tag string) *pdpb.ResponseHeader {
Message: msg,
})
}

// SyncMaxTS is a RPC method used to synchronize the timestamp of TSO between the
// Global TSO Allocator and multi Local TSO Allocator leaders. It contains two
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
// phases:
// 1. Collect timestamps among all Local TSO Allocator leaders, and choose the
// greatest one as MaxTS.
// 2. Send the MaxTS to all Local TSO Allocator leaders. They will compare MaxTS
// with its current TSO in memory to make sure their local TSOs are not less
// than MaxTS by writing MaxTS into memory to finish the global TSO synchronization.
func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
if err := s.validateInternalRequest(request.GetHeader()); err != nil {
return nil, err
}
tsoAllocatorManager := s.GetTSOAllocatorManager()
// Get all Local TSO Allocator leaders
allocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return nil, err
}
var processedDCs []string
if request.GetMaxTs() == nil || request.GetMaxTs().Physical == 0 {
// The first phase of synchronization: collect the max local ts
var maxLocalTS pdpb.Timestamp
for _, allocator := range allocatorLeaders {
// No longer leader, just skip here because
// the global allocator will check if all DCs are handled.
if !allocator.IsStillAllocatorLeader() {
continue
}
currentLocalTSO, err := allocator.GetCurrentTSO()
if err != nil {
return nil, err
}
if currentLocalTSO.Physical > maxLocalTS.Physical {
maxLocalTS.Physical = currentLocalTSO.Physical
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
MaxLocalTs: &maxLocalTS,
Dcs: processedDCs,
}, nil
}
// The second phase of synchronization: do the writing
for _, allocator := range allocatorLeaders {
if !allocator.IsStillAllocatorLeader() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if err := allocator.WriteTSO(request.GetMaxTs()); err != nil {
return nil, err
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
Dcs: processedDCs,
}, nil
}

// validateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between PD servers internally.
func (s *Server) validateInternalRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() {
return errors.WithStack(ErrNotStarted)
}
leaderID := s.GetLeader().GetMemberId()
if leaderID != header.GetSenderId() {
return status.Errorf(codes.FailedPrecondition, "mismatch leader id, need %d but got %d", leaderID, header.GetSenderId())
}
return nil
}
20 changes: 20 additions & 0 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) {
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
defer am.RUnlock()
if len(dcLocation) == 0 {
dcLocation = config.GlobalDCLocation
}
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
Expand Down Expand Up @@ -443,3 +446,20 @@ func (am *AllocatorManager) GetAllocators(filters ...AllocatorGroupFilter) []All
}
return allocators
}

// GetLocalAllocatorLeaders returns all Local TSO Allocator leaders this server holds.
func (am *AllocatorManager) GetLocalAllocatorLeaders() ([]*LocalTSOAllocator, error) {
localAllocators := am.GetAllocators(
FilterDCLocation(config.GlobalDCLocation),
FilterUnavailableLeadership(),
FilterUninitialized())
localAllocatorLeaders := make([]*LocalTSOAllocator, len(localAllocators))
for _, localAllocator := range localAllocators {
localAllocatorLeader, ok := localAllocator.(*LocalTSOAllocator)
if !ok {
return nil, errs.ErrGetLocalAllocator.FastGenByArgs("invalid local tso allocator found")
}
localAllocatorLeaders = append(localAllocatorLeaders, localAllocatorLeader)
}
return localAllocatorLeaders, nil
}
30 changes: 30 additions & 0 deletions server/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/election"
"github.com/tikv/pd/server/member"
"go.uber.org/zap"
Expand Down Expand Up @@ -60,6 +62,11 @@ func NewLocalTSOAllocator(member *member.Member, leadership *election.Leadership
}
}

// GetDCLocation returns the local allocator's dc-location.
func (lta *LocalTSOAllocator) GetDCLocation() string {
return lta.dcLocation
}

// Initialize will initialize the created local TSO allocator.
func (lta *LocalTSOAllocator) Initialize() error {
return lta.timestampOracle.SyncTimestamp(lta.leadership)
Expand Down Expand Up @@ -117,6 +124,29 @@ func (lta *LocalTSOAllocator) GetMember() *pdpb.Member {
return lta.member.Member()
}

// GetCurrentTSO returns current TSO in memory.
func (lta *LocalTSOAllocator) GetCurrentTSO() (pdpb.Timestamp, error) {
currentPhysical, currentLogical := lta.timestampOracle.getTSO()
if currentPhysical == typeutil.ZeroTime {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
return *tsoutil.GenerateTimestamp(currentPhysical, uint64(currentLogical)), nil
}

// WriteTSO is used to set the maxTS as current TSO in memory.
func (lta *LocalTSOAllocator) WriteTSO(maxTS *pdpb.Timestamp) error {
currentTSO, err := lta.GetCurrentTSO()
if err != nil {
return err
}
// If current local TSO has already been greater than
// maxTS, then do not update it.
if currentTSO.Physical >= maxTS.Physical {
return nil
}
return lta.SetTSO(tsoutil.GenerateTS(maxTS))
}

// EnableAllocatorLeader sets the Local TSO Allocator itself to a leader.
func (lta *LocalTSOAllocator) EnableAllocatorLeader() {
lta.setAllocatorLeader(lta.member.Member())
Expand Down