Skip to content

Commit

Permalink
Print keyspace-group-id zap field in the tso log conditionally (#6514)
Browse files Browse the repository at this point in the history
ref #5895

To keep the logging info in on-premises clean, we only print keyspace-group-id zap field
for the non-default keyspace group id.

Signed-off-by: Bin Shi <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
binshi-bing and ti-chi-bot[bot] authored May 25, 2023
1 parent 78bc7c1 commit 73b91b0
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 29 deletions.
80 changes: 66 additions & 14 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
}

// getGroupID returns the keyspace group ID of the allocator manager.
func (am *AllocatorManager) getGroupID() uint32 {
if am == nil {
return 0
}
return am.kgID
}

// GetTimestampPath returns the timestamp path in etcd for the given DCLocation.
func (am *AllocatorManager) GetTimestampPath(dcLocation string) string {
if am == nil {
Expand All @@ -297,13 +305,13 @@ func (am *AllocatorManager) tsoAllocatorLoop() {
defer am.svcLoopWG.Done()

am.AllocatorDaemon(am.ctx)
log.Info("exit allocator loop")
log.Info("exit allocator loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
}

// close is used to shutdown TSO Allocator updating daemon.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (am *AllocatorManager) close() {
log.Info("closing the allocator manager")
log.Info("closing the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))

if allocatorGroup, exist := am.getAllocatorGroup(GlobalDCLocation); exist {
allocatorGroup.allocator.(*GlobalTSOAllocator).close()
Expand All @@ -312,7 +320,7 @@ func (am *AllocatorManager) close() {
am.cancel()
am.svcLoopWG.Wait()

log.Info("closed the allocator manager")
log.Info("closed the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
}

// GetMember returns the ElectionMember of this AllocatorManager.
Expand All @@ -327,6 +335,7 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
serverID := am.member.ID()
if err := am.checkDCLocationUpperLimit(dcLocation); err != nil {
log.Error("check dc-location upper limit failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID != 0),
zap.Int("upper-limit", int(math.Pow(2, MaxSuffixBits))-1),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
Expand All @@ -345,12 +354,14 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
}
if !resp.Succeeded {
log.Warn("write dc-location configuration into etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
zap.Uint64("server-id", serverID))
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("write dc-location configuration into etcd",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
zap.Uint64("server-id", serverID))
Expand Down Expand Up @@ -393,6 +404,7 @@ func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations
dcLocation := string(kv.Value)
if err != nil {
log.Warn("get server id and dcLocation from etcd failed, invalid server id",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Any("splitted-serverPath", serverPath),
zap.String("dc-location", dcLocation),
errs.ZapError(err))
Expand Down Expand Up @@ -427,7 +439,9 @@ func (am *AllocatorManager) CleanUpDCLocation() error {
} else if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("delete the dc-location key previously written in etcd", zap.Uint64("server-id", serverID))
log.Info("delete the dc-location key previously written in etcd",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("server-id", serverID))
go am.ClusterDCLocationChecker()
return nil
}
Expand Down Expand Up @@ -493,6 +507,7 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string {
func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {
defer logutil.LogPanic()
defer log.Info("server is closed, return local tso allocator leader loop",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.String("local-tso-allocator-name", am.member.Name()))
for {
Expand All @@ -509,18 +524,21 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Name()))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()))
}

// Check the next-leader key
nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation())
if err != nil {
log.Error("get next leader from etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
Expand All @@ -530,6 +548,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
if nextLeader != 0 {
if nextLeader != am.member.ID() {
log.Info("skip campaigning of the local tso allocator leader and check later",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("server-name", am.member.Name()),
zap.Uint64("server-id", am.member.ID()),
zap.Uint64("next-leader-id", nextLeader))
Expand All @@ -544,6 +563,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())
if err != nil {
log.Error("get dc-location info from pd leader failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
// PD leader hasn't been elected out, wait for the campaign
Expand All @@ -554,6 +574,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
}
if !ok || dcLocationInfo.Suffix <= 0 || dcLocationInfo.MaxTs == nil {
log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("wait-duration", checkStep.String()))
Expand Down Expand Up @@ -590,6 +611,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
isNextLeader bool,
) {
log.Info("start to campaign local tso allocator leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -613,11 +635,13 @@ func (am *AllocatorManager) campaignAllocatorLeader(
if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
} else {
log.Error("failed to campaign local tso allocator leader due to etcd error",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()),
Expand All @@ -633,16 +657,19 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// Maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)
log.Info("campaign local tso allocator leader ok",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))

log.Info("initialize the local TSO allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil {
log.Error("failed to initialize the local TSO allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
Expand All @@ -651,6 +678,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
if dcLocationInfo.GetMaxTs().GetPhysical() != 0 {
if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil {
log.Error("failed to write the max local TSO after member changed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
Expand All @@ -662,6 +690,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// The next leader is me, delete it to finish campaigning
am.deleteNextLeaderID(allocator.GetDCLocation())
log.Info("local tso allocator leader is ready to serve",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -674,6 +703,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
case <-leaderTicker.C:
if !allocator.IsAllocatorLeader() {
log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -682,6 +712,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed, reset the local tso allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -693,7 +724,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
log.Info("entering into allocator daemon")
log.Info("entering into allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))

// allocatorPatroller should only work when enableLocalTSO is true to
// set up the new Local TSO Allocator in time.
Expand Down Expand Up @@ -730,7 +761,7 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
// PS: ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run,
// we should run them concurrently to speed up the progress.
case <-ctx.Done():
log.Info("exit allocator daemon")
log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
return
}
}
Expand All @@ -757,17 +788,20 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
case <-ag.ctx.Done():
// Resetting the allocator will clear TSO in memory
ag.allocator.Reset()
log.Info("exit the allocator update loop")
log.Info("exit the allocator update loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
return
default:
}
if !ag.leadership.Check() {
log.Info("allocator doesn't campaign leadership yet", zap.String("dc-location", ag.dcLocation))
log.Info("allocator doesn't campaign leadership yet",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", ag.dcLocation))
time.Sleep(200 * time.Millisecond)
return
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", ag.dcLocation),
zap.String("name", am.member.Name()),
errs.ZapError(err))
Expand Down Expand Up @@ -813,7 +847,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
}
newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd()
if err != nil {
log.Error("get cluster dc-locations from etcd failed", errs.ZapError(err))
log.Error("get cluster dc-locations from etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
errs.ZapError(err))
return
}
am.mu.Lock()
Expand Down Expand Up @@ -844,7 +880,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation)
if err != nil {
log.Warn("get or create the local tso suffix failed",
zap.String("dc-location", dcLocation), errs.ZapError(err))
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
errs.ZapError(err))
continue
}
if suffix > am.mu.maxSuffix {
Expand All @@ -856,7 +894,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
// Follower should check and update the am.mu.maxSuffix
maxSuffix, err := am.getMaxLocalTSOSuffix()
if err != nil {
log.Error("get the max local tso suffix from etcd failed", errs.ZapError(err))
log.Error("get the max local tso suffix from etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
errs.ZapError(err))
// Rollback the new dc-locations we update before
for _, dcLocation := range newDCLocations {
delete(am.mu.clusterDCLocations, dcLocation)
Expand Down Expand Up @@ -901,6 +941,7 @@ func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32,
}
if !txnResp.Succeeded {
log.Warn("write local tso suffix into etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.String("local-tso-suffix", localTSOSuffixValue),
zap.String("server-name", am.member.Name()),
Expand Down Expand Up @@ -983,12 +1024,14 @@ func (am *AllocatorManager) PriorityChecker() {
// find this allocator's dc-location isn't the same with server of dc-2 but is same with itself.
if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation {
log.Info("try to move the local tso allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("old-leader-id", leaderServerID),
zap.String("old-dc-location", leaderServerDCLocation),
zap.Uint64("next-leader-id", serverID),
zap.String("next-dc-location", myServerDCLocation))
if err := am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID()); err != nil {
log.Error("move the local tso allocator failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("old-leader-id", leaderServerID),
zap.String("old-dc-location", leaderServerDCLocation),
zap.Uint64("next-leader-id", serverID),
Expand All @@ -1005,12 +1048,16 @@ func (am *AllocatorManager) PriorityChecker() {
nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation)
if err != nil {
log.Error("get next leader from etcd failed",
zap.String("dc-location", allocatorGroup.dcLocation), errs.ZapError(err))
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocatorGroup.dcLocation),
errs.ZapError(err))
continue
}
// nextLeader is not empty and isn't same with the server ID, resign the leader
if nextLeader != 0 && nextLeader != serverID {
log.Info("next leader key found, resign current leader", zap.Uint64("nextLeaderID", nextLeader))
log.Info("next leader key found, resign current leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("nextLeaderID", nextLeader))
am.ResetAllocatorGroup(allocatorGroup.dcLocation)
}
}
Expand Down Expand Up @@ -1286,6 +1333,7 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) {
if _, ok := am.localAllocatorConn.clientConns[addr]; ok {
newConn.Close()
log.Debug("use old connection",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("target", newConn.Target()),
zap.String("state", newConn.GetState().String()))
return
Expand All @@ -1303,6 +1351,7 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u
if err != nil {
err = errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause()
log.Error("failed to grant the lease of the next leader key",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.Uint64("serverID", serverID),
errs.ZapError(err))
Expand All @@ -1315,12 +1364,15 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u
if err != nil {
err = errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
log.Error("failed to write next leader key into etcd",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID),
errs.ZapError(err))
return err
}
if !resp.Succeeded {
log.Warn("write next leader id into etcd unsuccessfully", zap.String("dc-location", dcLocation))
log.Warn("write next leader id into etcd unsuccessfully",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation))
return errs.ErrEtcdTxnConflict.GenWithStack("write next leader id into etcd unsuccessfully")
}
return nil
Expand Down
Loading

0 comments on commit 73b91b0

Please sign in to comment.