Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: implement the Global TSO optimization #3390

Merged
merged 32 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2d6af6d
Implement the Global TSO optimization
JmPotato Mar 8, 2021
4277724
Merge the upstream codes
JmPotato Mar 14, 2021
f2d2461
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Mar 16, 2021
3fcfbbd
Merge the upstream codes
JmPotato Mar 16, 2021
0e874a2
Merge the upstream branch
JmPotato Apr 2, 2021
67035bd
Add more comments
JmPotato Apr 7, 2021
87fb69a
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 7, 2021
4b1b8c5
Add back the missing updateTime update statement
JmPotato Apr 7, 2021
7709d7a
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 7, 2021
7c41b72
Refine codes and comments
JmPotato Apr 7, 2021
e77e807
Update kvproto version
JmPotato Apr 7, 2021
01da199
Refine some comments
JmPotato Apr 8, 2021
c109097
Add a switch for Global TSO estimation
JmPotato Apr 9, 2021
d642f6f
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 12, 2021
2f8d805
Address comments
JmPotato Apr 12, 2021
cac0534
Fix the problem of physical time advancing too fast
JmPotato Apr 19, 2021
8e63c25
Fix the bugs of Local TSO physical time advancing too fast and TSO fa…
JmPotato Apr 19, 2021
b18cf79
Reduce unnecessary memory usage
JmPotato Apr 19, 2021
d5dc5dc
Reduce unnecessary memory usage
JmPotato Apr 19, 2021
dfc0cc9
Merge branch 'master' of github.com:tikv/pd into refine_global_tso
JmPotato Apr 19, 2021
39ac3e2
Fix the bug that tsoObject may be reset to a smaller one
JmPotato Apr 20, 2021
ab0b11e
Fix the static check
JmPotato Apr 20, 2021
453f8e6
Fix typos
JmPotato Apr 20, 2021
153b6cb
Merge the latest fix
JmPotato Apr 20, 2021
aef8ce7
Merge the upstream branch
JmPotato Apr 22, 2021
806e1b4
Address the comments
JmPotato Apr 28, 2021
06638eb
Merge the upstream branch
JmPotato May 18, 2021
e426017
Address the comment
JmPotato May 18, 2021
ce90449
Remove enableGlobalTSOEstimation switch
JmPotato May 19, 2021
6545f32
Resolve the conflict
JmPotato May 19, 2021
19b53fd
Update kvproto
JmPotato May 27, 2021
6c48cb6
Merge branch 'master' into refine_global_tso
ti-chi-bot May 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,8 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

// Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201
replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5
replace (
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 => github.com/JmPotato/kvproto v0.0.0-20210407170524-c0d89fd1a7df
// Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201
go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JmPotato/kvproto v0.0.0-20210407170524-c0d89fd1a7df h1:C+1MdA1d/5U/97BK4LSETMinpu5z+JwHraaqNYQDpPo=
github.com/JmPotato/kvproto v0.0.0-20210407170524-c0d89fd1a7df/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
Expand Down Expand Up @@ -321,8 +323,6 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6 h1:lNGXD00uNXOKMM2pnTe9XvUv3IOEOtFhqNQljlTDZKc=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8=
Expand Down
53 changes: 25 additions & 28 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,42 +1274,37 @@ func (s *Server) incompatibleVersion(tag string) *pdpb.ResponseHeader {
})
}

// SyncMaxTS is a RPC method used to synchronize the timestamp of TSO between the
// Global TSO Allocator and multi Local TSO Allocator leaders. It contains two
// phases:
// 1. Collect timestamps among all Local TSO Allocator leaders, and choose the
// greatest one as MaxTS.
// 2. Send the MaxTS to all Local TSO Allocator leaders. They will compare MaxTS
// with its current TSO in memory to make sure their local TSOs are not less
// than MaxTS by writing MaxTS into memory to finish the global TSO synchronization.
// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set,
// and write it into all Local TSO Allocators then if it's indeed the biggest one.
func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
// Check the connection forwarding.
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
// In theory, it will not run to here because CollectMaxTS is an internal request.
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return pdpb.NewPDClient(client).SyncMaxTS(ctx, request)
}

// Validate the internal request header.
if err := s.validateInternalRequest(request.GetHeader(), true); err != nil {
return nil, err
}
tsoAllocatorManager := s.GetTSOAllocatorManager()
// There is no dc-location found in this server, return err.
if len(tsoAllocatorManager.GetClusterDCLocations()) == 0 {
return nil, status.Errorf(codes.Unknown, "empty cluster dc-Location found, checker may not work properly")
if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 {
return nil, status.Errorf(codes.Unknown, "empty cluster dc-location found, checker may not work properly")
}
// Get all Local TSO Allocator leaders
allocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders()
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
var processedDCs []string
if request.GetMaxTs() == nil || request.GetMaxTs().GetPhysical() == 0 {
// The first phase of synchronization: collect the max local ts
var maxLocalTS pdpb.Timestamp
var maxLocalTS *pdpb.Timestamp
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
if !request.GetSkipCheck() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
syncedDCs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
// No longer leader, just skip here because
// the global allocator will check if all DCs are handled.
Expand All @@ -1320,30 +1315,33 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
if tsoutil.CompareTimestamp(&currentLocalTSO, &maxLocalTS) > 0 {
if tsoutil.CompareTimestamp(currentLocalTSO, maxLocalTS) > 0 {
maxLocalTS = currentLocalTSO
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
// Found a bigger maxLocalTS, return it directly.
if tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs()) > 0 {
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
MaxLocalTs: maxLocalTS,
SyncedDcs: syncedDCs,
}, nil
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
MaxLocalTs: &maxLocalTS,
Dcs: processedDCs,
}, nil
}
// The second phase of synchronization: do the writing
syncedDCs := make([]string, 0, len(allocatorLeaders))
for _, allocator := range allocatorLeaders {
if !allocator.IsAllocatorLeader() {
continue
}
if err := allocator.WriteTSO(request.GetMaxTs()); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
processedDCs = append(processedDCs, allocator.GetDCLocation())
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
return &pdpb.SyncMaxTSResponse{
Header: s.header(),
Dcs: processedDCs,
Header: s.header(),
SyncedDcs: syncedDCs,
}, nil
}

Expand All @@ -1370,8 +1368,7 @@ func (s *Server) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsReq
}, nil
}

// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager, and will collect current max
// Local TSO if the NeedSyncMaxTSO flag in dc-location info is true.
// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager.
func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
Expand Down
2 changes: 1 addition & 1 deletion server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ func (am *AllocatorManager) GetMaxLocalTSO(ctx context.Context) (*pdpb.Timestamp
}
}
maxTSO := &pdpb.Timestamp{}
if err := globalAllocator.(*GlobalTSOAllocator).SyncMaxTS(ctx, clusterDCLocations, maxTSO); err != nil {
if err := globalAllocator.(*GlobalTSOAllocator).SyncMaxTS(ctx, clusterDCLocations, maxTSO, false); err != nil {
return &pdpb.Timestamp{}, err
}
return maxTSO, nil
Expand Down
Loading