Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: add SyncMaxTS RPC method #2988

Merged
merged 22 commits into from
Sep 27, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0bfc627
Add PrewriteMaxTS and WriteMaxTS RPC methods
JmPotato Sep 18, 2020
6a4c7e2
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 18, 2020
e82150b
Validate the sender ID with leader ID
JmPotato Sep 18, 2020
35f06dd
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 18, 2020
6126ebb
Handle the conflict
JmPotato Sep 18, 2020
bfddb94
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 21, 2020
792521e
Add a log
JmPotato Sep 21, 2020
9dee186
Update RPC verification process
JmPotato Sep 21, 2020
56c2c5c
Set tso by using uint64 ts
JmPotato Sep 21, 2020
8d82103
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 23, 2020
1fa290b
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 23, 2020
1f1b404
Remove the prewritten TSO in memory
JmPotato Sep 23, 2020
d406988
Compact the RPC methods
JmPotato Sep 23, 2020
52463fb
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 24, 2020
622c66e
Add the leadership checking
JmPotato Sep 24, 2020
674ba8a
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 25, 2020
138c2d4
Add the synced dcs checking
JmPotato Sep 25, 2020
fd82d24
Rename the field
JmPotato Sep 25, 2020
e40f53c
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 25, 2020
8fe9232
Address comments
JmPotato Sep 25, 2020
18de392
Merge branch 'master' of github.com:tikv/pd into tso_grpc
JmPotato Sep 27, 2020
0b0b1e1
Update kvproto and resolve the conflict
JmPotato Sep 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/pd

go 1.13

replace github.com/pingcap/kvproto v0.0.0-20200916031750-f9473f2c5379 => github.com/JmPotato/kvproto v0.0.0-20200918074858-edae97a0717e

require (
github.com/BurntSushi/toml v0.3.1
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JmPotato/kvproto v0.0.0-20200918074858-edae97a0717e h1:B/FobBoTw/6TLutzw6tAgcVT8/t4piyOJn/twWG+SmA=
github.com/JmPotato/kvproto v0.0.0-20200918074858-edae97a0717e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
Expand Down Expand Up @@ -300,8 +302,6 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200916031750-f9473f2c5379 h1:KAGE4PYxYLL/dnui3sRCcQHNpcpP5aMl0R/NKzATGgI=
github.com/pingcap/kvproto v0.0.0-20200916031750-f9473f2c5379/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
// tso errors
var (
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
Expand Down
97 changes: 96 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleTSORequest(config.GlobalDCLocation, count)
ts, err := s.tsoAllocatorManager.HandleTSORequest(request.GetDcLocation(), count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -908,3 +908,98 @@ func (s *Server) incompatibleVersion(tag string) *pdpb.ResponseHeader {
Message: msg,
})
}

// PrewriteMaxTS is the first part of 2PC for global MaxTS synchronization,
// it compares the MaxTS received with each Local TSO Allocator leader's TSO
// and prewrite it into memory to wait for the next phase of formal writing.
func (s *Server) PrewriteMaxTS(ctx context.Context, request *pdpb.PrewriteMaxTSRequest) (*pdpb.PrewriteMaxTSResponse, error) {
if err := s.validateInternalRequest(request.GetHeader()); err != nil {
return nil, err
}
tsoAllocatorManager := s.GetTSOAllocatorManager()
// Get all Local TSO Allocator leaders
allocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return nil, err
}
for _, allocator := range allocatorLeaders {
currentLocalTSO, err := allocator.GetCurrentTSO()
if err != nil {
return nil, err
}
// Validate whether the MaxTs is greater than all other local TSOs
if currentLocalTSO.Physical > request.MaxTs.Physical {
return &pdpb.PrewriteMaxTSResponse{
Header: s.header(),
Prewritten: false,
MaxLocalTs: &currentLocalTSO,
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}
// Do the prewriting
allocator.PrewriteTSO(request.MaxTs)
}
return &pdpb.PrewriteMaxTSResponse{
Header: s.header(),
Prewritten: true,
}, nil
}

// WriteMaxTS is the second part of 2PC for global MaxTS synchronization,
// it compares the MaxTS received with each Local TSO Allocator leader's
// prewritten TSO in memory and write it into memory to finish the global
// TSO synchronization progress.
func (s *Server) WriteMaxTS(ctx context.Context, request *pdpb.WriteMaxTSRequest) (*pdpb.WriteMaxTSResponse, error) {
if err := s.validateInternalRequest(request.GetHeader()); err != nil {
return nil, err
}
tsoAllocatorManager := s.GetTSOAllocatorManager()
// Get all Local TSO Allocator leaders
allocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return nil, err
}
for _, allocator := range allocatorLeaders {
// Validate whether the prewritten TSO is consistent
currentPrewrittenTSO := allocator.GetPrewrittenTSO()
if currentPrewrittenTSO == nil {
return &pdpb.WriteMaxTSResponse{
Header: s.errorHeader(&pdpb.Error{
Type: pdpb.ErrorType_UNKNOWN,
Message: "prewritten tso doesn't exist",
}),
Written: false,
}, nil

}
if currentPrewrittenTSO.Physical != request.MaxTs.Physical {
return &pdpb.WriteMaxTSResponse{
Header: s.errorHeader(&pdpb.Error{
Type: pdpb.ErrorType_UNKNOWN,
Message: "prewritten tso is not same with the MaxTS",
}),
Written: false,
}, nil
}
// Do the writing
if err := allocator.WriteTSO(); err != nil {
return nil, err
}
}
return &pdpb.WriteMaxTSResponse{
Header: s.header(),
Written: true,
}, nil
}

// validateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between PD servers internally.
func (s *Server) validateInternalRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() {
return errors.WithStack(ErrNotStarted)
}
leaderID := s.GetLeader().GetMemberId()
if leaderID != header.GetSenderId() {
return status.Errorf(codes.FailedPrecondition, "mismatch leader id, need %d but got %d", leaderID, header.GetSenderId())
}
return nil
}
20 changes: 20 additions & 0 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
defer am.RUnlock()
if len(dcLocation) == 0 {
dcLocation = config.GlobalDCLocation
}
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
Expand Down Expand Up @@ -387,3 +390,20 @@ func (am *AllocatorManager) GetAllocators(filters ...AllocatorGroupFilter) []All
}
return allocators
}

// GetLocalAllocatorLeaders returns all Local TSO Allocator leaders this server holds.
func (am *AllocatorManager) GetLocalAllocatorLeaders() ([]*LocalTSOAllocator, error) {
localAllocators := am.GetAllocators(
FilterDCLocation(config.GlobalDCLocation),
FilterUnavailableLeadership(),
FilterUninitialized())
localAllocatorLeaders := make([]*LocalTSOAllocator, len(localAllocators))
for _, localAllocator := range localAllocators {
localAllocatorLeader, ok := localAllocator.(*LocalTSOAllocator)
if !ok {
return nil, errs.ErrGetLocalAllocator.FastGenByArgs("invalid local tso allocator found")
}
localAllocatorLeaders = append(localAllocatorLeaders, localAllocatorLeader)
}
return localAllocatorLeaders, nil
}
49 changes: 49 additions & 0 deletions server/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/election"
"github.com/tikv/pd/server/member"
"go.uber.org/zap"
Expand All @@ -33,6 +34,10 @@ type LocalTSOAllocator struct {
// leadership is used to campaign the corresponding DC's Local TSO Allocator.
leadership *election.Leadership
timestampOracle *timestampOracle
// prewrittenTSO is used to store the MaxTS from Global TSO Allocator that
// is ready to write the TSO in memory.
// Todo: gc for unused prewrittenTSO
prewrittenTSO atomic.Value // stored as *pdpb.Timestamp
// for election use, notice that the leadership that member holds is
// the leadership for PD leader. Local TSO Allocator's leadership is for the
// election of Local TSO Allocator leader among several PD servers and
Expand Down Expand Up @@ -117,6 +122,50 @@ func (lta *LocalTSOAllocator) GetMember() *pdpb.Member {
return lta.member.Member()
}

// GetCurrentTSO returns current TSO in memory.
func (lta *LocalTSOAllocator) GetCurrentTSO() (pdpb.Timestamp, error) {
var currentTSO pdpb.Timestamp
current := (*atomicObject)(atomic.LoadPointer(&lta.timestampOracle.tso))
if current == nil || current.physical == typeutil.ZeroTime {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
currentTSO.Physical = current.physical.UnixNano() / int64(time.Millisecond)
currentTSO.Logical = current.logical
return currentTSO, nil
}

// PrewriteTSO is used to prewrite the MaxTS into memory.
func (lta *LocalTSOAllocator) PrewriteTSO(maxTS *pdpb.Timestamp) {
lta.prewrittenTSO.Store(maxTS)
}

// GetPrewrittenTSO is used to return the current prewrittenTSO in memory.
func (lta *LocalTSOAllocator) GetPrewrittenTSO() *pdpb.Timestamp {
prewrittenTSO := lta.prewrittenTSO.Load()
if prewrittenTSO == nil {
return nil
}
return prewrittenTSO.(*pdpb.Timestamp)
}

// WriteTSO is used to set the prewrittenTSO as current TSO in memory.
func (lta *LocalTSOAllocator) WriteTSO() error {
prewrittenTSO := lta.GetPrewrittenTSO()
if prewrittenTSO == nil {
return nil
}
currentTSO, err := lta.GetCurrentTSO()
if err != nil {
return err
}
// If current local TSO has already been greater than prewritten TSO.
// then do not update
if currentTSO.Physical >= prewrittenTSO.Physical {
return nil
}
return lta.SetTSO(uint64(prewrittenTSO.Physical))
}

// EnableAllocatorLeader sets the Local TSO Allocator itself to a leader.
func (lta *LocalTSOAllocator) EnableAllocatorLeader() {
lta.setAllocatorLeader(lta.member.Member())
Expand Down