Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: implement the Global TSO optimization #3390

Merged
merged 32 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2d6af6d
Implement the Global TSO optimization
JmPotato Mar 8, 2021
4277724
Merge the upstream codes
JmPotato Mar 14, 2021
f2d2461
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Mar 16, 2021
3fcfbbd
Merge the upstream codes
JmPotato Mar 16, 2021
0e874a2
Merge the upstream branch
JmPotato Apr 2, 2021
67035bd
Add more comments
JmPotato Apr 7, 2021
87fb69a
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 7, 2021
4b1b8c5
Add back the missing updateTime update statement
JmPotato Apr 7, 2021
7709d7a
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 7, 2021
7c41b72
Refine codes and comments
JmPotato Apr 7, 2021
e77e807
Update kvproto version
JmPotato Apr 7, 2021
01da199
Refine some comments
JmPotato Apr 8, 2021
c109097
Add a switch for Global TSO estimation
JmPotato Apr 9, 2021
d642f6f
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 12, 2021
2f8d805
Address comments
JmPotato Apr 12, 2021
cac0534
Fix the problem of physical time advancing too fast
JmPotato Apr 19, 2021
8e63c25
Fix the bugs of Local TSO physical time advancing too fast and TSO fa…
JmPotato Apr 19, 2021
b18cf79
Reduce unnecessary memory usage
JmPotato Apr 19, 2021
d5dc5dc
Reduce unnecessary memory usage
JmPotato Apr 19, 2021
dfc0cc9
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 19, 2021
39ac3e2
Fix the bug that tsoObject may be reset to a smaller one
JmPotato Apr 20, 2021
ab0b11e
Fix the static check
JmPotato Apr 20, 2021
453f8e6
Fix typos
JmPotato Apr 20, 2021
153b6cb
Merge the latest fix
JmPotato Apr 20, 2021
aef8ce7
Merge the upstream branch
JmPotato Apr 22, 2021
806e1b4
Address the comments
JmPotato Apr 28, 2021
06638eb
Merge the upstream branch
JmPotato May 18, 2021
e426017
Address the comment
JmPotato May 18, 2021
ce90449
Remove enableGlobalTSOEstimation switch
JmPotato May 19, 2021
6545f32
Resolve the conflict
JmPotato May 19, 2021
19b53fd
Update kvproto
JmPotato May 27, 2021
6c48cb6
Merge branch 'master' into refine_global_tso
ti-chi-bot May 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20210510050259-8f2b3a06beef
github.com/pingcap/kvproto v0.0.0-20210527074428-73468940541b
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
github.com/pingcap/tidb-dashboard v0.0.0-20210512074702-4ee3e3909d5e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210510050259-8f2b3a06beef h1:mU05rF9VQn0TspHenP4ipYsiWU+wv1bEM9AEnd3UG3g=
github.com/pingcap/kvproto v0.0.0-20210510050259-8f2b3a06beef/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210527074428-73468940541b h1:NI+3/QYkQGTS6q1GTw3rapNu3l03K71+shL03r8/OOo=
github.com/pingcap/kvproto v0.0.0-20210527074428-73468940541b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8=
Expand Down
49 changes: 22 additions & 27 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,32 +1274,25 @@ func (s *Server) incompatibleVersion(tag string) *pdpb.ResponseHeader {
})
}

// SyncMaxTS is a RPC method used to synchronize the timestamp of TSO between the
// Global TSO Allocator and multi Local TSO Allocator leaders. It contains two
// phases:
// 1. Collect timestamps among all Local TSO Allocator leaders, and choose the
// greatest one as MaxTS.
// 2. Send the MaxTS to all Local TSO Allocator leaders. They will compare MaxTS
// with its current TSO in memory to make sure their local TSOs are not less
// than MaxTS by writing MaxTS into memory to finish the global TSO synchronization.
// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set,
// and write it into all Local TSO Allocators then if it's indeed the biggest one.
func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
if err := s.validateInternalRequest(request.GetHeader(), true); err != nil {
return nil, err
}
tsoAllocatorManager := s.GetTSOAllocatorManager()
// There is no dc-location found in this server, return err.
if len(tsoAllocatorManager.GetClusterDCLocations()) == 0 {
return nil, status.Errorf(codes.Unknown, "empty cluster dc-Location found, checker may not work properly")
if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 {
return nil, status.Errorf(codes.Unknown, "empty cluster dc-location found, checker may not work properly")
}
// Get all Local TSO Allocator leaders
allocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders()
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
processedDCs := make([]string, 0, len(allocatorLeaders))
if request.GetMaxTs() == nil || request.GetMaxTs().GetPhysical() == 0 {
// The first phase of synchronization: collect the max local ts
var maxLocalTS pdpb.Timestamp
if !request.GetSkipCheck() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
var maxLocalTS *pdpb.Timestamp
syncedDCs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
// No longer leader, just skip here because
// the global allocator will check if all DCs are handled.
Expand All @@ -1310,30 +1303,33 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
if tsoutil.CompareTimestamp(&currentLocalTSO, &maxLocalTS) > 0 {
if tsoutil.CompareTimestamp(currentLocalTSO, maxLocalTS) > 0 {
maxLocalTS = currentLocalTSO
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
// Found a bigger maxLocalTS, return it directly.
if tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs()) > 0 {
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
MaxLocalTs: maxLocalTS,
SyncedDcs: syncedDCs,
}, nil
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
MaxLocalTs: &maxLocalTS,
Dcs: processedDCs,
}, nil
}
// The second phase of synchronization: do the writing
syncedDCs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
if !allocator.IsAllocatorLeader() {
continue
}
if err := allocator.WriteTSO(request.GetMaxTs()); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
Dcs: processedDCs,
Header: s.header(),
SyncedDcs: syncedDCs,
}, nil
}

Expand All @@ -1360,8 +1356,7 @@ func (s *Server) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsReq
}, nil
}

// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager, and will collect current max
// Local TSO if the NeedSyncMaxTSO flag in dc-location info is true.
// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager.
func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
var err error
if err = s.validateInternalRequest(request.GetHeader(), false); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ func (am *AllocatorManager) GetMaxLocalTSO(ctx context.Context) (*pdpb.Timestamp
if err != nil {
return nil, err
}
if err := globalAllocator.(*GlobalTSOAllocator).SyncMaxTS(ctx, clusterDCLocations, maxTSO); err != nil {
if err := globalAllocator.(*GlobalTSOAllocator).SyncMaxTS(ctx, clusterDCLocations, maxTSO, false); err != nil {
return nil, err
}
return maxTSO, nil
Expand Down
Loading