Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso, tests: implement the keyspace group merge checker #6625

Merged
merged 8 commits into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceGroupIsMerging"]
error = '''
the keyspace group %d is merging
'''

["PD:tso:ErrKeyspaceGroupNotInitialized"]
error = '''
the keyspace group %d isn't initialized
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS"))
ErrKeyspaceGroupIsMerging = errors.Normalize("the keyspace group %d is merging", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIsMerging"))
)

// member errors
Expand Down
13 changes: 13 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,3 +1406,16 @@ func (am *AllocatorManager) GetLeaderAddr() string {
}
return leaderAddrs[0]
}

// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func (am *AllocatorManager) getKeyspaceGroupTSPath(groupID uint32) string {
tsPath := ""
if am.kgID != mcsutils.DefaultKeyspaceGroupID {
tsPath = path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}
return tsPath
}
14 changes: 1 addition & 13 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"path"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -89,16 +88,6 @@ func NewGlobalTSOAllocator(
am *AllocatorManager,
startGlobalLeaderLoop bool,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
tsPath := ""
if am.kgID != mcsutils.DefaultKeyspaceGroupID {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix)
}

ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
ctx: ctx,
Expand All @@ -107,8 +96,7 @@ func NewGlobalTSOAllocator(
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
rootPath: am.rootPath,
tsPath: tsPath,
tsPath: am.getKeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down
216 changes: 209 additions & 7 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -50,6 +52,9 @@ const (
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
// mergingCheckInterval is the interval for merging check to see if the keyspace groups
// merging process could be moved forward.
mergingCheckInterval = 5 * time.Second
)

type state struct {
Expand Down Expand Up @@ -241,6 +246,9 @@ type KeyspaceGroupManager struct {
groupWatcher *etcdutil.LoopWatcher

primaryPathBuilder *kgPrimaryPathBuilder

// mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group.
mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -384,12 +392,9 @@ func (kgm *KeyspaceGroupManager) Close() {
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
for _, member := range group.Members {
if member.Address == kgm.tsoServiceID.ServiceAddr {
return true
}
}
return false
return slice.AnyOf(group.Members, func(i int) bool {
return group.Members[i].Address == kgm.tsoServiceID.ServiceAddr
})
}

// updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to
Expand All @@ -416,9 +421,25 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
return
}

oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID)
// If this host owns a replica of the keyspace group which is the merge target,
// it should run the merging checker when the merge state first time changes.
if !oldGroup.IsMergeTarget() && group.IsMergeTarget() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(kgm.ctx)
kgm.mergeCheckerCancelMap.Store(group.ID, cancel)
kgm.wg.Add(1)
go kgm.mergingChecker(ctx, group.ID, group.MergeState.MergeList)
}
// If the merge state has been finished, cancel its merging checker.
if oldGroup.IsMergeTarget() && !group.IsMergeTarget() {
if cancel, loaded := kgm.mergeCheckerCancelMap.LoadAndDelete(group.ID); loaded && cancel != nil {
cancel.(context.CancelFunc)()
}
}

// If this host is already assigned a replica of this keyspace group, i.e., the election member
// is already initialized, just update the meta.
if oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID); oldAM != nil {
if oldAM != nil {
kgm.updateKeyspaceGroupMembership(oldGroup, group, true)
return
}
Expand Down Expand Up @@ -738,6 +759,10 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.checkTSOMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
ts, err = am.HandleRequest(dcLocation, count)
return ts, curKeyspaceGroupID, err
}
Expand Down Expand Up @@ -898,3 +923,180 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
kgm.kgs[id] = splitGroup
return nil
}

func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
kgm.Lock()
defer kgm.Unlock()
// Check if the keyspace group is in the merging state.
mergeTarget := kgm.kgs[id]
if !mergeTarget.IsMergeTarget() {
return nil
}
// Check if the HTTP client is initialized.
if kgm.httpClient == nil {
return nil
}
statusCode, err := apiutil.DoDelete(
kgm.httpClient,
kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/merge", id))
if err != nil {
return err
}
if statusCode != http.StatusOK {
log.Warn("failed to finish merging keyspace group",
zap.Uint32("keyspace-group-id", id),
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
mergeTarget.MergeState = nil
kgm.kgs[id] = mergeTarget
return nil
}

// mergingChecker is used to check if the keyspace group is in merge state, and if so, it will
// make sure the newly merged TSO keep consistent with the original ones.
func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTargetID uint32, mergeList []uint32) {
log.Info("start to merge the keyspace group",
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList))
defer logutil.LogPanic()
defer kgm.wg.Done()

checkTicker := time.NewTicker(mergingCheckInterval)
defer checkTicker.Stop()
// Prepare the merge map.
mergeMap := make(map[uint32]struct{}, len(mergeList))
for _, id := range mergeList {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
mergeMap[id] = struct{}{}
}

for {
select {
case <-ctx.Done():
log.Info("merging checker is closed",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList))
return
case <-checkTicker.C:
}
// Check if current TSO node is the merge target TSO primary node.
am, err := kgm.GetAllocatorManager(mergeTargetID)
if err != nil {
log.Warn("unable to get the merge target allocator manager",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("keyspace-group-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
// If the current TSO node is not the merge target TSO primary node,
// we still need to keep this loop running to avoid unexpected primary changes.
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
if !am.IsLeader() {
log.Debug("current tso node is not the merge target primary",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList))
continue
}
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey)
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Uint32("merge-id", id),
zap.Any("remaining", mergeMap),
zap.Error(err))
continue
}
if len(val) == 0 {
delete(mergeMap, id)
}
}
}
if len(mergeMap) > 0 {
continue
}
// All the keyspace group primaries in the merge list are gone,
// update the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(am.getKeyspaceGroupTSPath(id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support "merge all non-default keyspace groups to default keyspace group option"? In this case, we don't need to query saved timestamp for every group in the merge list but just LoadTimestamp(KeyspaceGroupTSPathPrefix) once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea. I will implement this after this PR is merged.

if err != nil || ts == typeutil.ZeroTime {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Uint32("merge-id", id),
zap.Time("ts", ts),
zap.Error(err))
mergedTS = typeutil.ZeroTime
break
}
if ts.After(mergedTS) {
mergedTS = ts
}
}
if mergedTS == typeutil.ZeroTime {
continue
}
// Update the newly merged TSO.
// TODO: support the Local TSO Allocator.
allocator, err := am.GetAllocator(GlobalDCLocation)
if err != nil {
log.Error("failed to get the allocator",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
err = allocator.SetTSO(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the timestamps of other groups need to be deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's possible these keyspace groups being split out again, it's necessary to keep them to make sure the TSO consistency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we merge groups, the merged groups will use the maximum timestamp of all. If we want to split out again, the split group will use the maximum timestamp as the baseline. Why is it related to consistency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I didn't consider the TSO synchronization process of split before. You're right. It's reasonable to delete them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation, I think this cleaning function can be implemented in a separate PR, and I plan to put it in a separate goroutine. Let's focus the current PR on the basic functional implementation of merging.

tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)),
true, true)
if err != nil {
log.Error("failed to update the newly merged TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Time("merged-ts", mergedTS),
zap.Error(err))
continue
}
// Finish the merge.
err = kgm.finishMergeKeyspaceGroup(mergeTargetID)
if err != nil {
log.Error("failed to finish the merge",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
log.Info("finished merging keyspace group",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Time("merged-ts", mergedTS))
return
}
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (kgm *KeyspaceGroupManager) checkTSOMerge(
keyspaceGroupID uint32,
) error {
_, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
if !group.IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID)
}
1 change: 0 additions & 1 deletion pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func NewLocalTSOAllocator(
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
rootPath: am.rootPath,
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
Expand Down
3 changes: 1 addition & 2 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ type tsoObject struct {

// timestampOracle is used to maintain the logic of TSO.
type timestampOracle struct {
client *clientv3.Client
rootPath string
client *clientv3.Client
// When tsPath is empty, it means that it is a global timestampOracle.
tsPath string
storage endpoint.TSOStorage
Expand Down
Loading