Skip to content

Commit

Permalink
Add unused allocator group clean up
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Sep 18, 2020
1 parent 3127c5b commit e25418d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (s *Server) tsoAllocatorLoop() {

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.tsoAllocatorManager.AllocatorDaemon(ctx, cancel)
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info("server is closed, exit allocator loop")
}

Expand Down
75 changes: 50 additions & 25 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(parentCtx context.Context, a

// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context, serverCtxCancel context.CancelFunc) {
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
tsTicker := time.NewTicker(UpdateTimestampStep)
defer tsTicker.Stop()
checkerTicker := time.NewTicker(checkAllocatorStep)
Expand All @@ -302,30 +302,6 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context, serverCtx
}
}

// Check if we have any new dc-location configured, if yes,
// then set up the corresponding local allocator.
func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) {
clusterDCLocations, err := am.GetClusterDCLocations()
if err != nil {
log.Error("check new allocators failed, can't get cluster dc-locations", errs.ZapError(err))
}
allocatorGroups := am.getAllocatorGroups()
for dcLocation := range clusterDCLocations {
if slice.NoneOf(allocatorGroups, func(i int) bool {
return allocatorGroups[i].dcLocation == dcLocation
}) {
if err := am.SetUpAllocator(serverCtx, dcLocation, election.NewLeadership(
am.member.Client(),
am.getAllocatorPath(dcLocation),
fmt.Sprintf("%s local allocator leader election", dcLocation),
)); err != nil {
log.Error("check new allocators failed, can't set up a new local allocator", zap.String("dc-location", dcLocation), errs.ZapError(err))
continue
}
}
}
}

// Update the Local TSO Allocator leaders TSO in memory concurrently.
func (am *AllocatorManager) allocatorUpdater() {
// Filter out allocators without leadership and uninitialized
Expand Down Expand Up @@ -360,6 +336,55 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
}
}

// Check if we have any new dc-location configured, if yes,
// then set up the corresponding local allocator.
func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) {
clusterDCLocations, err := am.GetClusterDCLocations()
if err != nil {
log.Error("check new allocators failed, can't get cluster dc-locations", errs.ZapError(err))
}
// Collect all dc-locations
dcLocations := make([]string, len(clusterDCLocations))
for dcLocation := range clusterDCLocations {
dcLocations = append(dcLocations, dcLocation)
}
// Get all Local TSO Allocators
allocatorGroups := am.getAllocatorGroups(FilterDCLocation(config.GlobalDCLocation))
// Set up the new one
for _, dcLocation := range dcLocations {
if slice.NoneOf(allocatorGroups, func(i int) bool {
return allocatorGroups[i].dcLocation == dcLocation
}) {
if err := am.SetUpAllocator(serverCtx, dcLocation, election.NewLeadership(
am.member.Client(),
am.getAllocatorPath(dcLocation),
fmt.Sprintf("%s local allocator leader election", dcLocation),
)); err != nil {
log.Error("check new allocators failed, can't set up a new local allocator", zap.String("dc-location", dcLocation), errs.ZapError(err))
continue
}
}
}
// Clean up the unused one
for _, ag := range allocatorGroups {
if slice.NoneOf(dcLocations, func(i int) bool {
return dcLocations[i] == ag.dcLocation
}) {
am.deleteAllocatorGroup(ag.dcLocation)
}
}
}

func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) {
am.Lock()
defer am.Unlock()
if allocatorGroup, exist := am.allocatorGroups[dcLocation]; exist {
allocatorGroup.allocator.Reset()
allocatorGroup.leadership.Reset()
}
delete(am.allocatorGroups, dcLocation)
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
Expand Down

0 comments on commit e25418d

Please sign in to comment.