Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor logging in tso service to separate OP and Cloud #6514

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 66 additions & 14 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
}

// getGroupID returns the keyspace group ID of the allocator manager.
func (am *AllocatorManager) getGroupID() uint32 {
if am == nil {
return 0
}
return am.kgID
}

// GetTimestampPath returns the timestamp path in etcd for the given DCLocation.
func (am *AllocatorManager) GetTimestampPath(dcLocation string) string {
if am == nil {
Expand All @@ -297,13 +305,13 @@ func (am *AllocatorManager) tsoAllocatorLoop() {
defer am.svcLoopWG.Done()

am.AllocatorDaemon(am.ctx)
log.Info("exit allocator loop")
log.Info("exit allocator loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
}

// close is used to shutdown TSO Allocator updating daemon.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (am *AllocatorManager) close() {
log.Info("closing the allocator manager")
log.Info("closing the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))

if allocatorGroup, exist := am.getAllocatorGroup(GlobalDCLocation); exist {
allocatorGroup.allocator.(*GlobalTSOAllocator).close()
Expand All @@ -312,7 +320,7 @@ func (am *AllocatorManager) close() {
am.cancel()
am.svcLoopWG.Wait()

log.Info("closed the allocator manager")
log.Info("closed the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
}

// GetMember returns the ElectionMember of this AllocatorManager.
Expand All @@ -327,6 +335,7 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
serverID := am.member.ID()
if err := am.checkDCLocationUpperLimit(dcLocation); err != nil {
log.Error("check dc-location upper limit failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID != 0),
zap.Int("upper-limit", int(math.Pow(2, MaxSuffixBits))-1),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
Expand All @@ -345,12 +354,14 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
}
if !resp.Succeeded {
log.Warn("write dc-location configuration into etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
zap.Uint64("server-id", serverID))
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("write dc-location configuration into etcd",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
zap.Uint64("server-id", serverID))
Expand Down Expand Up @@ -393,6 +404,7 @@ func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations
dcLocation := string(kv.Value)
if err != nil {
log.Warn("get server id and dcLocation from etcd failed, invalid server id",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Any("splitted-serverPath", serverPath),
zap.String("dc-location", dcLocation),
errs.ZapError(err))
Expand Down Expand Up @@ -427,7 +439,9 @@ func (am *AllocatorManager) CleanUpDCLocation() error {
} else if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("delete the dc-location key previously written in etcd", zap.Uint64("server-id", serverID))
log.Info("delete the dc-location key previously written in etcd",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("server-id", serverID))
go am.ClusterDCLocationChecker()
return nil
}
Expand Down Expand Up @@ -493,6 +507,7 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string {
func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {
defer logutil.LogPanic()
defer log.Info("server is closed, return local tso allocator leader loop",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.String("local-tso-allocator-name", am.member.Name()))
for {
Expand All @@ -509,18 +524,21 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Name()))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()))
}

// Check the next-leader key
nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation())
if err != nil {
log.Error("get next leader from etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
Expand All @@ -530,6 +548,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
if nextLeader != 0 {
if nextLeader != am.member.ID() {
log.Info("skip campaigning of the local tso allocator leader and check later",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("server-name", am.member.Name()),
zap.Uint64("server-id", am.member.ID()),
zap.Uint64("next-leader-id", nextLeader))
Expand All @@ -544,6 +563,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())
if err != nil {
log.Error("get dc-location info from pd leader failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
// PD leader hasn't been elected out, wait for the campaign
Expand All @@ -554,6 +574,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
}
if !ok || dcLocationInfo.Suffix <= 0 || dcLocationInfo.MaxTs == nil {
log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("wait-duration", checkStep.String()))
Expand Down Expand Up @@ -590,6 +611,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
isNextLeader bool,
) {
log.Info("start to campaign local tso allocator leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -613,11 +635,13 @@ func (am *AllocatorManager) campaignAllocatorLeader(
if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
} else {
log.Error("failed to campaign local tso allocator leader due to etcd error",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()),
Expand All @@ -633,16 +657,19 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// Maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)
log.Info("campaign local tso allocator leader ok",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))

log.Info("initialize the local TSO allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil {
log.Error("failed to initialize the local TSO allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
Expand All @@ -651,6 +678,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
if dcLocationInfo.GetMaxTs().GetPhysical() != 0 {
if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil {
log.Error("failed to write the max local TSO after member changed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
Expand All @@ -662,6 +690,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// The next leader is me, delete it to finish campaigning
am.deleteNextLeaderID(allocator.GetDCLocation())
log.Info("local tso allocator leader is ready to serve",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -674,6 +703,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
case <-leaderTicker.C:
if !allocator.IsAllocatorLeader() {
log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -682,6 +712,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed, reset the local tso allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -693,7 +724,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
log.Info("entering into allocator daemon")
log.Info("entering into allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))

// allocatorPatroller should only work when enableLocalTSO is true to
// set up the new Local TSO Allocator in time.
Expand Down Expand Up @@ -730,7 +761,7 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
// PS: ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run,
// we should run them concurrently to speed up the progress.
case <-ctx.Done():
log.Info("exit allocator daemon")
log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
return
}
}
Expand All @@ -757,17 +788,20 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
case <-ag.ctx.Done():
// Resetting the allocator will clear TSO in memory
ag.allocator.Reset()
log.Info("exit the allocator update loop")
log.Info("exit the allocator update loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
return
default:
}
if !ag.leadership.Check() {
log.Info("allocator doesn't campaign leadership yet", zap.String("dc-location", ag.dcLocation))
log.Info("allocator doesn't campaign leadership yet",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", ag.dcLocation))
time.Sleep(200 * time.Millisecond)
return
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", ag.dcLocation),
zap.String("name", am.member.Name()),
errs.ZapError(err))
Expand Down Expand Up @@ -813,7 +847,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
}
newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd()
if err != nil {
log.Error("get cluster dc-locations from etcd failed", errs.ZapError(err))
log.Error("get cluster dc-locations from etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
errs.ZapError(err))
return
}
am.mu.Lock()
Expand Down Expand Up @@ -844,7 +880,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation)
if err != nil {
log.Warn("get or create the local tso suffix failed",
zap.String("dc-location", dcLocation), errs.ZapError(err))
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
errs.ZapError(err))
continue
}
if suffix > am.mu.maxSuffix {
Expand All @@ -856,7 +894,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
// Follower should check and update the am.mu.maxSuffix
maxSuffix, err := am.getMaxLocalTSOSuffix()
if err != nil {
log.Error("get the max local tso suffix from etcd failed", errs.ZapError(err))
log.Error("get the max local tso suffix from etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
errs.ZapError(err))
// Rollback the new dc-locations we update before
for _, dcLocation := range newDCLocations {
delete(am.mu.clusterDCLocations, dcLocation)
Expand Down Expand Up @@ -901,6 +941,7 @@ func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32,
}
if !txnResp.Succeeded {
log.Warn("write local tso suffix into etcd failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.String("local-tso-suffix", localTSOSuffixValue),
zap.String("server-name", am.member.Name()),
Expand Down Expand Up @@ -983,12 +1024,14 @@ func (am *AllocatorManager) PriorityChecker() {
// find this allocator's dc-location isn't the same with server of dc-2 but is same with itself.
if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation {
log.Info("try to move the local tso allocator",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("old-leader-id", leaderServerID),
zap.String("old-dc-location", leaderServerDCLocation),
zap.Uint64("next-leader-id", serverID),
zap.String("next-dc-location", myServerDCLocation))
if err := am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID()); err != nil {
log.Error("move the local tso allocator failed",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("old-leader-id", leaderServerID),
zap.String("old-dc-location", leaderServerDCLocation),
zap.Uint64("next-leader-id", serverID),
Expand All @@ -1005,12 +1048,16 @@ func (am *AllocatorManager) PriorityChecker() {
nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation)
if err != nil {
log.Error("get next leader from etcd failed",
zap.String("dc-location", allocatorGroup.dcLocation), errs.ZapError(err))
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", allocatorGroup.dcLocation),
errs.ZapError(err))
continue
}
// nextLeader is not empty and isn't same with the server ID, resign the leader
if nextLeader != 0 && nextLeader != serverID {
log.Info("next leader key found, resign current leader", zap.Uint64("nextLeaderID", nextLeader))
log.Info("next leader key found, resign current leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("nextLeaderID", nextLeader))
am.ResetAllocatorGroup(allocatorGroup.dcLocation)
}
}
Expand Down Expand Up @@ -1286,6 +1333,7 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) {
if _, ok := am.localAllocatorConn.clientConns[addr]; ok {
newConn.Close()
log.Debug("use old connection",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("target", newConn.Target()),
zap.String("state", newConn.GetState().String()))
return
Expand All @@ -1303,6 +1351,7 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u
if err != nil {
err = errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause()
log.Error("failed to grant the lease of the next leader key",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation),
zap.Uint64("serverID", serverID),
errs.ZapError(err))
Expand All @@ -1315,12 +1364,15 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u
if err != nil {
err = errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
log.Error("failed to write next leader key into etcd",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID),
errs.ZapError(err))
return err
}
if !resp.Succeeded {
log.Warn("write next leader id into etcd unsuccessfully", zap.String("dc-location", dcLocation))
log.Warn("write next leader id into etcd unsuccessfully",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("dc-location", dcLocation))
return errs.ErrEtcdTxnConflict.GenWithStack("write next leader id into etcd unsuccessfully")
}
return nil
Expand Down
Loading