Skip to content

Commit

Permalink
Implement the Global TSO optimization
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Mar 8, 2021
1 parent 0e15869 commit 2d6af6d
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 128 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (

replace (
github.com/oleiade/reflections => github.com/oleiade/reflections v1.0.1
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 => github.com/JmPotato/kvproto v0.0.0-20210222031724-03117d594669
github.com/sirupsen/logrus => github.com/sirupsen/logrus v1.2.0
go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5
)
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JmPotato/kvproto v0.0.0-20210222031724-03117d594669 h1:kHqfGLVxYru5HNuhoUlqAcZMP0g3+B204wEHsEbR7gA=
github.com/JmPotato/kvproto v0.0.0-20210222031724-03117d594669/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
Expand Down Expand Up @@ -327,8 +329,6 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 h1:lNGXD00uNXOKMM2pnTe9XvUv3IOEOtFhqNQljlTDZKc=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down Expand Up @@ -410,7 +410,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho=
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
37 changes: 21 additions & 16 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,10 +984,9 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
var processedDCs []string
if request.GetMaxTs() == nil || request.GetMaxTs().GetPhysical() == 0 {
// The first phase of synchronization: collect the max local ts
var maxLocalTS pdpb.Timestamp
var maxLocalTS pdpb.Timestamp
if !request.GetSkipCheck() {
syncedDCs := make([]string, 0)
for _, allocator := range allocatorLeaders {
// No longer leader, just skip here because
// the global allocator will check if all DCs are handled.
Expand All @@ -1001,27 +1000,34 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
if tsoutil.CompareTimestamp(&currentLocalTSO, &maxLocalTS) > 0 {
maxLocalTS = currentLocalTSO
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
if tsoutil.CompareTimestamp(request.GetMaxTs(), &maxLocalTS) <= 0 {
// Make the MaxLocalTs bigger to distinguish it from the estimated one
if tsoutil.CompareTimestamp(request.GetMaxTs(), &maxLocalTS) == 0 {
maxLocalTS.Physical += tso.UpdateTimestampGuard.Milliseconds()
maxLocalTS.Logical = 0
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
MaxLocalTs: &maxLocalTS,
SyncedDcs: syncedDCs,
}, nil
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
MaxLocalTs: &maxLocalTS,
Dcs: processedDCs,
}, nil
}
// The second phase of synchronization: do the writing
syncedDCs := make([]string, 0)
for _, allocator := range allocatorLeaders {
if !allocator.IsAllocatorLeader() {
continue
}
if err := allocator.WriteTSO(request.GetMaxTs()); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
Dcs: processedDCs,
Header: s.header(),
SyncedDcs: syncedDCs,
}, nil
}

Expand All @@ -1038,8 +1044,7 @@ func (s *Server) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsReq
}, nil
}

// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager, and will collect current max
// Local TSO if the NeedSyncMaxTSO flag in dc-location info is true.
// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager.
func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
var err error
if err = s.validateInternalRequest(request.GetHeader(), false); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ func (am *AllocatorManager) GetMaxLocalTSO(ctx context.Context) (*pdpb.Timestamp
}
}
maxTSO := &pdpb.Timestamp{}
if err := globalAllocator.(*GlobalTSOAllocator).SyncMaxTS(ctx, clusterDCLocations, maxTSO); err != nil {
if err := globalAllocator.(*GlobalTSOAllocator).SyncMaxTS(ctx, clusterDCLocations, false, maxTSO); err != nil {
return &pdpb.Timestamp{}, err
}
return maxTSO, nil
Expand Down
Loading

0 comments on commit 2d6af6d

Please sign in to comment.