Skip to content

Commit

Permalink
tso: implement groupSplitPatroller to speed up the split process (tik…
Browse files Browse the repository at this point in the history
…v#6736)

ref tikv#5895, close tikv#6696

Implement `groupSplitPatroller` to speed up the split process.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 9dee403 commit 837661b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 14 deletions.
71 changes: 68 additions & 3 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
// of the primaries on this TSO server/pod have changed. A goroutine will periodically check
// do this check and re-distribute the primaries if necessary.
defaultPrimaryPriorityCheckInterval = 10 * time.Second
groupPatrolInterval = time.Minute
)

type state struct {
Expand All @@ -74,13 +75,16 @@ type state struct {
kgs [mcsutils.MaxKeyspaceGroupCountInUse]*endpoint.KeyspaceGroup
// keyspaceLookupTable is a map from keyspace to the keyspace group to which it belongs.
keyspaceLookupTable map[uint32]uint32
// splittingGroups is the cache of splitting keyspace group related information.
splittingGroups map[uint32]struct{}
}

func (s *state) initialize() {
s.keyspaceLookupTable = make(map[uint32]uint32)
s.splittingGroups = make(map[uint32]struct{})
}

func (s *state) deinitialize() {
func (s *state) deInitialize() {
log.Info("closing all keyspace groups")

s.Lock()
Expand Down Expand Up @@ -398,8 +402,9 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}

kgm.wg.Add(1)
kgm.wg.Add(2)
go kgm.primaryPriorityCheckLoop()
go kgm.groupSplitPatroller()

return nil
}
Expand All @@ -415,7 +420,7 @@ func (kgm *KeyspaceGroupManager) Close() {
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.state.deinitialize()
kgm.state.deInitialize()

log.Info("keyspace group manager closed")
}
Expand Down Expand Up @@ -732,6 +737,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
}
kgm.kgs[group.ID] = group
kgm.ams[group.ID] = am
// If the group is the split target, add it to the splitting group map.
if group.IsSplitTarget() {
kgm.splittingGroups[group.ID] = struct{}{}
}
kgm.Unlock()
}

Expand Down Expand Up @@ -859,6 +868,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
// Check if the split is completed.
if oldGroup != nil && oldGroup.IsSplitTarget() && !newGroup.IsSplitting() {
kgm.ams[groupID].GetMember().(*member.Participant).SetCampaignChecker(nil)
delete(kgm.splittingGroups, groupID)
}
kgm.kgs[groupID] = newGroup
}
Expand Down Expand Up @@ -1322,3 +1332,58 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
return
}
}

// groupSplitPatroller is used to patrol the groups that are in the on-going
// split state and to check if we could speed up the split process.
func (kgm *KeyspaceGroupManager) groupSplitPatroller() {
defer kgm.wg.Done()
patrolInterval := groupPatrolInterval
failpoint.Inject("fastGroupSplitPatroller", func() {
patrolInterval = 200 * time.Millisecond
})
ticker := time.NewTicker(patrolInterval)
defer ticker.Stop()
log.Info("group split patroller is started",
zap.Duration("patrol-interval", patrolInterval))
for {
select {
case <-kgm.ctx.Done():
log.Info("group split patroller is exiting")
return
case <-ticker.C:
}
kgm.RLock()
if len(kgm.splittingGroups) == 0 {
kgm.RUnlock()
continue
}
var splittingGroups []uint32
for id := range kgm.splittingGroups {
splittingGroups = append(splittingGroups, id)
}
kgm.RUnlock()
for _, groupID := range splittingGroups {
am, group := kgm.getKeyspaceGroupMeta(groupID)
if !am.IsLeader() {
continue
}
if len(group.Keyspaces) == 0 {
log.Warn("abnormal keyspace group with empty keyspace list",
zap.Uint32("keyspace-group-id", groupID))
continue
}
log.Info("request tso for the splitting keyspace group",
zap.Uint32("keyspace-group-id", groupID),
zap.Uint32("keyspace-id", group.Keyspaces[0]))
// Request the TSO manually to speed up the split process.
_, _, err := kgm.HandleTSORequest(group.Keyspaces[0], groupID, GlobalDCLocation, 1)
if err != nil {
log.Warn("failed to request tso for the splitting keyspace group",
zap.Uint32("keyspace-group-id", groupID),
zap.Uint32("keyspace-id", group.Keyspaces[0]),
zap.Error(err))
continue
}
}
}
}
20 changes: 9 additions & 11 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestTSOKeyspaceGroupManager(t *testing.T) {

func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
Expand All @@ -81,6 +82,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() {
suite.cancel()
suite.tsoCluster.Destroy()
suite.cluster.Destroy()
suite.Require().NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -276,17 +278,15 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
NewID: 2,
Keyspaces: []uint32{222, 333},
})
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
re.True(kg2.IsSplitTarget())
// Check the split TSO from keyspace group 2.
var splitTS pdpb.Timestamp
// Wait for the split to complete automatically even there is no TSO request from the outside.
testutil.Eventually(re, func() bool {
splitTS, err = suite.requestTSO(re, 222, 2)
return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
return !kg2.IsSplitting()
})
splitTS, err = suite.requestTSO(re, 222, 2)
// Check the split TSO from keyspace group 2 now.
splitTS, err := suite.requestTSO(re, 222, 2)
re.NoError(err)
re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0)
}
Expand Down Expand Up @@ -356,8 +356,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection
return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0
})
re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls())
// Finish the split.
handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2)
// Wait for the keyspace groups to finish the split.
waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333})
}
Expand Down

0 comments on commit 837661b

Please sign in to comment.